使用反射获取flink的DataType、LogicalType
舟率率 10/9/2023 flink
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.*;
@Test
public void test03()throws IOException{
Map<String, String> fieldMap=getFieldMap(Person.class);
// 根据fieldMap一步步转换
String className=fieldMap.remove("0");
System.out.println("类名:"+className);
for(Map.Entry<String, String> entry:fieldMap.entrySet()){
String fieldName=entry.getKey();
LogicalType fieldType=toDataType(entry.getValue()).getLogicalType();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 传入类class,获取Map形式的字段名称和字段类型
* 其中还会存储类名,key=0,value=类名
*
* @param clazz
* @return
*/
public static Map<String, String> getFieldMap(Class<?> clazz){
Map<String, String> fieldMap=new HashMap<>();
fieldMap.put("0",clazz.getSimpleName());
Field[]fields=clazz.getDeclaredFields();
for(Field field:fields){
field.setAccessible(true);
String name=field.getName();
String type=field.getType().getSimpleName().toLowerCase();
fieldMap.put(name,type);
}
return fieldMap;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* org.apache.flink.table.types.utils.ClassDataTypeConverter
* addDefaultDataType(String.class, DataTypes.STRING());
* addDefaultDataType(Boolean.class, DataTypes.BOOLEAN());
* addDefaultDataType(boolean.class, DataTypes.BOOLEAN());
* addDefaultDataType(Byte.class, DataTypes.TINYINT());
* addDefaultDataType(byte.class, DataTypes.TINYINT());
* addDefaultDataType(Short.class, DataTypes.SMALLINT());
* addDefaultDataType(short.class, DataTypes.SMALLINT());
* addDefaultDataType(Integer.class, DataTypes.INT());
* addDefaultDataType(int.class, DataTypes.INT());
* addDefaultDataType(Long.class, DataTypes.BIGINT());
* addDefaultDataType(long.class, DataTypes.BIGINT());
* addDefaultDataType(Float.class, DataTypes.FLOAT());
* addDefaultDataType(float.class, DataTypes.FLOAT());
* addDefaultDataType(Double.class, DataTypes.DOUBLE());
* addDefaultDataType(double.class, DataTypes.DOUBLE());
* addDefaultDataType(java.sql.Date.class, DataTypes.DATE());
* addDefaultDataType(java.time.LocalDate.class, DataTypes.DATE());
* addDefaultDataType(java.sql.Time.class, DataTypes.TIME(0));
* addDefaultDataType(java.time.LocalTime.class, DataTypes.TIME(9));
* addDefaultDataType(java.sql.Timestamp.class, DataTypes.TIMESTAMP(9));
* addDefaultDataType(java.time.LocalDateTime.class, DataTypes.TIMESTAMP(9));
* addDefaultDataType(java.time.OffsetDateTime.class, DataTypes.TIMESTAMP_WITH_TIME_ZONE(9));
* addDefaultDataType(java.time.Instant.class, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9));
* addDefaultDataType(java.time.Duration.class, DataTypes.INTERVAL(DataTypes.SECOND(9)));
* addDefaultDataType(java.time.Period.class, DataTypes.INTERVAL(DataTypes.YEAR(4), DataTypes.MONTH()));
*
* @param fieldType
* @return
*/
private static DataType toDataType(String fieldType)throws Exception{
switch(fieldType){
case"string":
return DataTypes.STRING();
case"boolean":
return DataTypes.BOOLEAN();
case"byte":
return DataTypes.TINYINT();
case"short":
return DataTypes.SMALLINT();
case"integer":
case"int":
return DataTypes.INT();
case"long":
return DataTypes.BIGINT();
case"float":
return DataTypes.FLOAT();
case"double":
return DataTypes.DOUBLE();
default:
throw new Exception("unknown field type");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Setter
@Getter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public static class Person implements Serializable {
private String name;
private Integer age;
private Boolean sex;
private Byte sex1;
private Short sex2;
private Integer sex3;
private Float sex4;
private Double sex5;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16