postgre工具

2/10/2023 postgre

# python连接pg

#!/usr/bin/python
# -*- encoding: utf-8 -*-
import numpy as np
import pandas as pd
import psycopg2
import re


def format_sql(sql):
    sql = re.sub("\\s+", " ", sql)
    print(sql)
    return sql


class Pg:
    def __init__(self, host, port, dbname, user, password):
        self.host = host
        self.port = port
        self.dbname = dbname
        self.user = user
        self.password = password
        self.client = psycopg2.connect(host=self.host, port=self.port, dbname=self.dbname, user=self.user,
                                       password=self.password)
        # self.client.autocommit = True

    def query_df(self, sql, columns):
        """
        查询后以dataframe形式返回
        :param sql:
        :param columns:
        :return:
        """
        return pd.DataFrame(self.query(format_sql(sql)), columns=columns)

    def query(self, sql):
        with self.client.cursor() as cursor:
            cursor.execute(format_sql(sql))
            return cursor.fetchall()

    def update(self, sql):
        """
        查询后以dataframe形式返回
        :param sql:
        :return:
        """
        with self.client.cursor() as cursor:
            cursor.execute(format_sql(sql))
        self.client.commit()

    def update_batch(self, sql, df):
        """
        批量更新
        :param sql:
        :return:
        """
        with self.client.cursor() as cursor:
            cursor.executemany(format_sql(sql), np.array(df).tolist())
        self.client.commit()

    def close(self):
        self.client.close()


if __name__ == '__main__':
    pg = Pg("", "", "", "", "")
    # 批量更新
    # self.client.cursor().executemany()
    print(pg.query("select * from test_table limit 10"))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

# pg表创建语句

# 分区表及索引创建

drop table "public"."test_table";
CREATE TABLE "public"."test_table" (
"collector"           varchar ,
"pub_type"            varchar ,
"pro_id"              integer NOT NULL,
"pub_id"              integer NOT NULL,
"root_pro_id"         integer NOT NULL,
"root_pub_id"         integer NOT NULL,
"fee_type"            integer ,
"recipient_type"      integer ,
"blk_num"             integer NOT NULL,
"hash"                varchar NOT NULL,
"log_idx"             integer NOT NULL,
"idx"                 integer NOT NULL,
"mirr_addr"           varchar,
"timestamp"           integer,
"amount"              float8,
"currency"            varchar,
"referralFee"         integer,
"origin_addr"         varchar,
"decimals"            integer,
"pform_addr"          varchar,
"pform_rate"          integer,
"plt_amount"          float8,
"mirr_amount"         float8,
"origin_amount"       float8,
PRIMARY KEY (pro_id, hash,log_idx)
) PARTITION BY HASH (pro_id);






CREATE INDEX "test_table_pro_id_idx" ON "public"."test_table" ("pro_id");
CREATE INDEX "test_table_pub_id_idx" ON "public"."test_table" ("pub_id");
CREATE INDEX "test_table_root_pro_id_idx" ON "public"."test_table" ("root_pro_id");
CREATE INDEX "test_table_root_pub_id_idx" ON "public"."test_table" ("root_pub_id");
CREATE INDEX "test_table_fee_type_idx" ON "public"."test_table" ("fee_type");
CREATE INDEX "test_table_recipient_type_idx" ON "public"."test_table" ("recipient_type");
CREATE INDEX "test_table_timestamp_idx" ON "public"."test_table" ("timestamp");


CREATE INDEX "test_table_collector_idx" ON "public"."test_table" USING btree ("collector" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST);
CREATE INDEX "test_table_pub_type_idx" ON "public"."test_table" USING btree ("pub_type" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST);
CREATE INDEX "test_table_mirr_addr_idx" ON "public"."test_table" USING btree ("mirr_addr" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST);
CREATE INDEX "test_table_origin_addr_idx" ON "public"."test_table" USING btree ("origin_addr" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST);
CREATE INDEX "test_table_pform_addr_idx" ON "public"."test_table" USING btree ("pform_addr" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST);


do $$  
declare
parts int := 20;
begin
for i in 0..parts-1 loop    
    execute format('CREATE TABLE test_table_%s PARTITION OF test_table FOR VALUES WITH (MODULUS %s,REMAINDER %s);', i, parts, i);

end loop;
end;  
$$;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
Last Updated: 6/1/2024, 6:36:28 AM