读取parquet文件

10/10/2023

# ReadParquet

package com.person.example.demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.Version;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnReadStore;
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;

import java.io.File;
import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @Description https://blog.csdn.net/weixin_43369296/article/details/132837895
 */
public class ReadParquet {
    public static List<Map<String, Object>> readParquetFileWithGroup(String filePath) throws IOException {
        List<Map<String, Object>> recordList = new ArrayList<>();
        //  读取parquet文件 如果要连接hdfs,需要加上conf.set("fs.defaultFS", "hdfs://xxxxx");
        ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(filePath), new Configuration()));
        //  根据文件头获取元数据信息
        MessageType schema = reader.getFooter().getFileMetaData().getSchema();
        List<Type> fields = schema.getFields();
        PageReadStore pages;
        //  遍历行组,一般就一个行组,数据量多了一个文件会有多个行组
        while ((pages = reader.readNextRowGroup()) != null) {
            long rows = pages.getRowCount();
            MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
            RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
            //  遍历行
            for (int i = 0; i < rows; i++) {
                Map<String, Object> recordMap = new HashMap<>();
                SimpleGroup simpleGroup = (SimpleGroup) recordReader.read();
                //  遍历列获取数据数据
                fields.stream().forEach(item -> {
                    final String name = item.getName();
                    //  根据列名称获取下标获取对应数据
                    final int fieldIndex = simpleGroup.getType().getFieldIndex(name);
                    final String valueToString = simpleGroup.getValueToString(fieldIndex, 0);
                    recordMap.put(name, valueToString);
                });
                recordList.add(recordMap);
            }
        }
        reader.close();
        return recordList;
    }

    /**
     * 列式读取
     * @param filePath
     * @return
     * @throws IOException
     */
    public static Map<String, List<String>> readParquetFileWithColumn(String filePath) throws IOException {
        Map<String, List<String>> columnMap = new HashMap<>();
        // 如果要连接hdfs,需要加上conf.set("fs.defaultFS", "hdfs://xxxxx");
        Configuration conf = new Configuration();
        final Path path = new Path(filePath);
        ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER);
        MessageType schema = readFooter.getFileMetaData().getSchema();
        ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
        //  遍历行组信息
        PageReadStore rowGroup = null;
        while (null != (rowGroup = r.readNextRowGroup())) {
            ColumnReader colReader = null;
            //  读取列信息
            ColumnReadStore colReadStore = new ColumnReadStoreImpl(rowGroup, new GroupRecordConverter(schema).getRootConverter(), schema, Version.FULL_VERSION);
            List<ColumnDescriptor> descriptorList = schema.getColumns();
            //遍历列
            for (ColumnDescriptor colDescriptor : descriptorList) {
                String[] columnNamePath = colDescriptor.getPath();
                //  列名称
                String columnName = Arrays.toString(columnNamePath);
                colReader = colReadStore.getColumnReader(colDescriptor);
                //  当前列的数据行数
                long totalValuesInColumnChunk = rowGroup.getPageReader(colDescriptor).getTotalValueCount();
                //获取列类型,根据列类型调用不同的方法获取数据
                PrimitiveType.PrimitiveTypeName type = colDescriptor.getType();
                final String name = type.name();
                List<String> columnList = new ArrayList<>();
                columnMap.put(columnName, columnList);
                //遍历列中每个元素
                for (int i = 0; i < totalValuesInColumnChunk; i++) {
                    String val = "";
                    if (name.equals("INT32")) {
                        val = String.valueOf(colReader.getInteger());
                    } else if (name.equals("INT64")) {
                        val = String.valueOf(colReader.getLong());
                    } else {
                        val = colReader.getBinary().toStringUsingUTF8();
                    }
                    columnList.add(val);
                    colReader.consume();
                }
            }
        }
        r.close();
        return columnMap;
    }

    public static void main(String[] args) throws IOException {
        String filePath = "D:/e/tmp/parquet1/dt=2023-10-09/part-0-0";
        System.out.println(readParquetFileWithRecord(filePath));
        ;
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151

# flink1.12

package com.jac.test.demo;

import com.jac.jpilot.bigdata.flink.Job;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.ParquetRowInputFormat;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.types.Row;
import org.apache.parquet.schema.*;

import java.io.Serializable;
import java.util.ArrayList;

/**
 * @Author zhouyong
 * @File Test
 * @Time 2023/9/27 15:05
 * @Description 工程描述
 */
public class TestHdfsParquetSink {
    private static final String s = "1";

    public static void main(String[] args) throws Exception {
        // test();
        StreamExecutionEnvironment env = Job.getEnv(args);
        env.setParallelism(1);


        final String pathString = "C:\\Users\\20231220\\Downloads\\1\\";
        final ArrayList<Type> cityFields = new ArrayList<>();
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "unique_id"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "vin"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "event_time"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "command_type"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "protocol_vehicle_type"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "hdfs_created_at"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "kafka_time"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT64, "kafka_offset"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.BINARY, "kafka_topic"));
        cityFields.add(new PrimitiveType(Type.Repetition.REQUIRED, PrimitiveType.PrimitiveTypeName.INT32, "kafka_partition"));

        final ParquetRowInputFormat cityFormat = new ParquetRowInputFormat(new Path(pathString), new MessageType("", cityFields));
        final SingleOutputStreamOperator<DBBattMonitor> parquetDataSet = env.readFile(cityFormat, pathString
                        , FileProcessingMode.PROCESS_CONTINUOUSLY, 60 * 60 * 1000)
                .map(new MapFunction<Row, DBBattMonitor>() {
                    @Override
                    public DBBattMonitor map(Row value) throws Exception {
                        DBBattMonitor dbBattMonitor = new DBBattMonitor();
                        dbBattMonitor.setUnique_id(value.getField(0).toString());
                        dbBattMonitor.setVin(value.getField(1).toString());
                        dbBattMonitor.setEvent_time(value.getField(2).toString());
                        dbBattMonitor.setCommand_type(value.getField(3).toString());
                        dbBattMonitor.setProtocol_vehicle_type(value.getField(4).toString());
                        dbBattMonitor.setHdfs_created_at(value.getField(5).toString());
                        dbBattMonitor.setKafka_time(value.getField(6).toString());
                        dbBattMonitor.setKafka_offset(Long.parseLong(value.getField(7).toString()));
                        dbBattMonitor.setKafka_topic(value.getField(8).toString());
                        dbBattMonitor.setKafka_partition(Integer.parseInt(value.getField(9).toString()));
                        return dbBattMonitor;
                    }
                })
                .returns(DBBattMonitor.class)
                .name("getCityConfig");


        parquetDataSet.print();


        env.execute("");
    }

    /**
     * Test datum.
     */
    public static class Datum implements Serializable {

        public String a;
        public int b;

        public Datum() {
        }

        public Datum(String a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }

            Datum datum = (Datum) o;
            return b == datum.b && (a != null ? a.equals(datum.a) : datum.a == null);
        }

        @Override
        public int hashCode() {
            int result = a != null ? a.hashCode() : 0;
            result = 31 * result + b;
            return result;
        }
    }


}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113

# flink1.14

gitee示例 (opens new window)

Last Updated: 6/1/2024, 6:36:28 AM