基于反射获取类信息,实现JdbcSink快速赋值
舟率率 11/9/2023 flink
public static class Column {
private String name;
private String jsonName;
private String type;
}
/**
* priorityJsonProperty:是否优先使用jsonProperty
*
* @param clazz
* @return
*/
public static List<Column> getColumnList(Class<?> clazz) {
List<Column> columnList = new ArrayList<>();
for (Field field : clazz.getDeclaredFields()) {
Column column = new Column();
String jsonName = Optional.ofNullable(field.getAnnotation(JsonProperty.class))
.map(JsonProperty::value)
.orElseGet(field::getName);
String name = field.getName();
if (jsonName.contains("$") || name.contains("$")) continue;
String type = field.getType().getSimpleName().toLowerCase();
column.setType(type);
column.setName(name);
column.setJsonName(jsonName);
columnList.add(column);
}
return columnList;
}
/**
* 方便JdbcSink在进行大批量字段执行时,快速开发准备的
* columnTypes必须全是小写
* Timestamp 要求要将值转换成时间戳
*
* @param preparedStatement
* @param columnList
* @param valMap
* @throws Exception
*/
@SneakyThrows
public static void executeBatch(PreparedStatement preparedStatement, List<Column> columnList, Map<String, String> valMap) {
for (int i = 0; i < columnList.size(); i++) {
Column column = columnList.get(i);
String value = Optional.ofNullable(valMap.get(column.getJsonName())).filter(x -> !EnvConf.NULL.equals(x)).orElse(null);
switch (column.getType()) {
case "int":
case "integer":
case "smallint":
case "tinyint":
if (value == null) preparedStatement.setNull(i + 1, Types.INTEGER);
else preparedStatement.setInt(i + 1, Integer.parseInt(value));
break;
case "long":
case "bigint":
if (value == null) preparedStatement.setNull(i + 1, Types.BIGINT);
else preparedStatement.setLong(i + 1, Long.parseLong(value));
break;
case "double":
case "float":
if (value == null) preparedStatement.setNull(i + 1, Types.DOUBLE);
else preparedStatement.setDouble(i + 1, Double.parseDouble(value));
break;
case "str":
case "string":
case "char":
case "text":
case "longtext":
case "varchar":
case "binary":
if (value == null) preparedStatement.setNull(i + 1, Types.VARCHAR);
else preparedStatement.setString(i + 1, value);
break;
case "bool":
if (value == null) preparedStatement.setNull(i + 1, Types.BOOLEAN);
else preparedStatement.setBoolean(i + 1, Boolean.parseBoolean(value));
break;
case "timestamp":
if (value == null) preparedStatement.setNull(i + 1, Types.TIMESTAMP);
else preparedStatement.setTimestamp(i + 1, new Timestamp(Long.parseLong(value)));
break;
default:
throw new Exception("columnType is unknown");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89