sqltools

cxoracle

CxOracle

class dramkit.sqltools.cxoracle.CxOracle(host='localhost', user='test', password='xxxxxxxxxxx', database='orclpdb', port=1521, logger=None, **kwargs)

Bases: object

clear_data(tb_name)
copy()
create_table(tb_name, cols_info, idcols=None, force=False)
df_to_sql(df, tb_name, act_type='insert', cols=None, idcols=None, col_types={}, na_val=None, inf_val='inf', _inf_val='-inf', **kwargs_cols)
drop_table(tb_name, purge=True)
execute_sql(sql_str, to_df=True)
get_create_table_sql(tb_name)
get_fields(tb_name)
get_tables()
has_table(tb_name)

get_conn

dramkit.sqltools.cxoracle.get_conn(host='localhost', user='test', password=None, database='orclpdb', port=1521, **kwargs)

连接数据库

Examples

>>> host, user = 'localhost', 'test',
>>> database, port = 'orclpdb', 1521
>>> password = 'xxxxxxxxxxx'
>>> con1 = get_conn(host=host, user=user,
...                 password=password,
...                 database=database, port=port)
>>> host, user = 'localhost', 'c##test',
>>> database, port = 'orcl', 1521
>>> password = 'xxxxxxxxxxx'
>>> con2 = get_conn(host=host, user=user,
...                 password=password,
...                 database=database, port=port)

execute_sql

dramkit.sqltools.cxoracle.execute_sql(conn, sql_str, to_df=True)

执行sql语句并返回结果

Examples

>>> conn = _get_test_conn()
>>> df1 = execute_sql(conn, 'select * from test1')
>>> # execute_sql(conn, 'drop table test2 purge')
>>> execute_sql(conn,
...             """create table test2
...             (code varchar(20), year int, val1 float)""")
>>> df2 = execute_sql(conn, 'select * from test2')
>>> execute_sql(conn,
                """insert into test2 values
                ('a', 2022, 1)""")
>>> df2 = execute_sql(conn, 'select * from test2')

get_fields

dramkit.sqltools.cxoracle.get_fields(conn, tb_name)

获取表字段名列表

clear_data

dramkit.sqltools.cxoracle.clear_data(conn, tb_name)

清空表中数据

has_table

dramkit.sqltools.cxoracle.has_table(conn, tb_name)

copy_table

dramkit.sqltools.cxoracle.copy_table(conn, tb_name, with_data=False)

复制表

create_table

dramkit.sqltools.cxoracle.create_table(conn, tb_name, cols_info, idcols=None, force=False)

新建表

Examples

>>> conn = get_conn(password='xxxxxxxxxxx')
>>> idcols = None
>>> idcols = ['a', 'c']
>>> create_table(conn, 'test2',
...              ('a VARCHAR2(255)',
...               'b FLOAT DEFAULT 1 NOT NULL',
...               'c DATE'),
...              idcols=idcols, force=True)

get_create_table_sql

dramkit.sqltools.cxoracle.get_create_table_sql(conn, tb_name)

查询表创建语句

drop_table

dramkit.sqltools.cxoracle.drop_table(conn, tb_name, purge=True)

删除表

get_tables

dramkit.sqltools.cxoracle.get_tables(conn)

get_cols_info_df

dramkit.sqltools.cxoracle.get_cols_info_df(df, cols=None, col_types={}, all2str=False, big_text_cols=[])

根据pd.DataFrame中的列cols,识别对应列在Oracle中的字段类别

Parameters:
  • df (pandas.DataFrame) – 待识别数据

  • cols (list, None) – 待识别列名列表,默认所有列

  • col_types (dict) – 指定列类型,如{‘col1’: ‘VARCHAR2(20)’, ‘col2’: ‘NUMBER(10,0)’},指定的列不做判断,直接返回

  • all2str (bool) – 若为True,则数据均按文本类型处理

  • big_text_cols (str, list) – 文本是否为长文本,若为’all’,则全部按长文本处理,若为list,则list指定列按长文本处理

Returns:

  • cols_info (str) – 列类型信息,格式如’col1 col1_type, col2 col2_type, …’

  • dtype (dict) – 字典格式的类类型信息

  • placeholder (str) – 占位符信息,格式如’:1, :2, …’

References

https://blog.csdn.net/tonydz0523/article/details/82529941

merge_into

dramkit.sqltools.cxoracle.merge_into(conn, tb_tgt, tb_src, cols, idcols, rep_keep='src')

df_to_sql

dramkit.sqltools.cxoracle.df_to_sql(df, conn, tb_name, act_type='insert', cols=None, idcols=None, col_types={}, na_val=None, inf_val='inf', _inf_val='-inf', **kwargs_cols)

把pandas.DataFrame存入Oracle数据库中

Parameters:
  • df (pandas.DataFrame) – 待存数据

  • conn (cx_oracle.connect) – cx_oracle.connect数据库连接对象

  • tb_name (str) – 存入的表名

  • act_type (str) –

    存入方式:
    若为’ignore_tb’,则当表已经存在时不进行任何操作
    若为’new’,则新建表(若原表已存在,则会先删除再重建)
    若为’clear’,则先清空已存在数据(若原表已存在)
    若为’insert’,则直接插入
    若为’replace’,则将已存在的数据更新,不存在的行和列都新插入
    若为’insert_ignore’,则已有的行不更新,不存在的行新插入
    若为’insert_newcols’(谨慎使用),则对已存在的列同直接insert(但是遇到已存在的行时不会报错,可能会丢失数据),有新列则插入新列内容
    若为’insert_ignore_newcols’,则对已存在的列同直接insert_ignore,有新列则插入新列内容

  • cols (None, list) – 需要存入的数据列

  • db_name (str) – 存入的数据库名

  • na_val (None, bool, str) – 设置df中na值的填充值

  • idcols (None, str, list) – 设置唯一值标识字段,只有在表不存在需要新建的时候才起作用

  • col_types (dict) – 指定字段类型

Examples

>>> conn = get_conn(password='xxxxxxxxxxx')
>>> tb_name = 'test1'
>>> # idcols = None
>>> idcols = ['code', 'year']
>>> df0 = pd.DataFrame({'code': ['001', '002', '003', '004', '005', '006'],
...                     'year': ['2011', '2012', '2013', '2014', '2015', '2016'],
...                     'value': [1, 2, 3, 4, 5, 6],
...                     'value1': [1, 2, '3', 4, 5, 6],
...                     'value0': ['1a', '2b', '3c', '4d', '5e', '6f']})
>>> df = pd.DataFrame({'code': ['001', '002', '003', '004', '005', '006'],
...                    'year': ['2011', '2012', '2013', '2014', '2015', '2016'],
...                    'value': [1, 2, 3, np.nan, np.inf, -np.inf],
...                    'value0': ['1a', '2b', '3c', '4d', '5e', '6f']})
>>> df1 = pd.DataFrame({'code': ['006', '007', '008', '009'],
...                     'year': ['2016', '2017', '2018', '2019'],
...                     'value': [66, 7, 8, 9],
...                     'VALUE2': [10, 11, 12, 13],
...                     'VALUE3': ['10a', '11b', 12, '13']})
>>> df_to_sql(df, conn, tb_name, act_type='new', idcols=idcols)
>>> df_to_sql(df1, conn, tb_name, act_type='replace', idcols=idcols)

py_hive

PyHive

class dramkit.sqltools.py_hive.PyHive(host, username, password=None, database=None, port=10000, driver='pyhive', **kwargs)

Bases: object

copy()
create_table(tb_name, cols_info, idcols=None, db_name=None, force=False)

新建表

Examples

>>> db = PyHive(host='192.168.118.128', port=10000,
...             username='root', database='test')
>>> idcols = None
>>> idcols = ['a', 'c']
>>> db.create_table('test15',
...                 ('a VARCHAR(255)',
...                  'b FLOAT NOT NULL',
...                  'c DATE'),
...                 idcols=idcols, db_name='test1',
...                 force=True)
df_to_sql(df, tb_name, act_type='replace', db_name=None, cols=None, idcols=None, col_types={}, na_val=None, inf_val='inf', _inf_val='-inf', **kwargs_cols)

把pandas.DataFrame存入Hive数据库中

Examples

>>> db = PyHive(host='192.168.118.128', port=10000,
...             username='root', database='test')
>>> db = PyHive(host='192.168.118.128', port=10000,
...             username='root', database='test',
...             password='123456', driver='impala',
...             auth_mechanism='PLAIN')
>>> tb_name = 'test'
>>> # idcols = None
>>> idcols = ['code', 'year']
>>> df0 = pd.DataFrame({'code': ['001', '002', '003', '004', '005', '006'],
...                     'year': ['2011', '2012', '2013', '2014', '2015', '2016'],
...                     'value': [1, 2, 3, 4, 5, 6],
...                     'value1': [1, 2, '3', 4, 5, 6],
...                     'value0': ['1a', '2b', '3c', '4d', '5e', '6f']})
>>> df = pd.DataFrame({'code': ['001', '002', '003', '004', '005', '006'],
...                    'year': ['2011', '2012', '2013', '2014', '2015', '2016'],
...                    'value': [1, 2, 3, np.nan, np.inf, -np.inf],
...                    'value0': ['1a', '2b', '3c', '4d', '5e', '6f']})
>>> df1 = pd.DataFrame({'code': ['006', '007', '008', '009'],
...                     'year': ['2016', '2017', '2018', '2019'],
...                     'value': [66, 7, 8, 9],
...                     'VALUE2': [10, 11, 12, 13],
...                     'VALUE3': ['10a', '11b', 12, '13']})
>>> db.df_to_sql(df, tb_name, act_type='new', idcols=idcols)
>>> db.df_to_sql(df1, tb_name, act_type='insert_ignore_newcols', idcols=idcols)
drop_cols(tb_name, cols, db_name=None)

删除列

Notes

Hive中,如果表是orc格式的,不支持删除,会报错,
如果是textfile格式,则可以删除。
drop_table(tb_name, db_name=None, purge=True)

删除表

execute_sql(sql_str, db_name=None, to_df=True)
get_conn(*args, **kwargs)
get_fields(tb_name, db_name=None)

获取表字段名列表

get_version()
has_table(tb_name, db_name=None)
merge_into(*args, **kwargs)
merge_into1(tb_tgt, tb_src, replace_cols, idcols, rep_keep='src', db_name=None)
merge_into2(tb_tgt, tb_src, replace_cols, idcols, rep_keep='src', db_name=None)
now_database()
show_tables(db_name=None)

get_conn_pyhive

dramkit.sqltools.py_hive.get_conn_pyhive(host, user, password=None, database='default', port=10000, auth=None, **kwargs)

连接数据库

get_conn_impala

dramkit.sqltools.py_hive.get_conn_impala(host, user, password=None, port=10000, database='default', auth_mechanism=None, **kwargs)

execute_sql

dramkit.sqltools.py_hive.execute_sql(conn, sql_str, db_name=None, to_df=True)

执行sql语句并返回结果

py_mysql

PyMySQL

class dramkit.sqltools.py_mysql.PyMySQL(host='localhost', user='root', password=None, database=None, port=3306, logger=None, **kwargs)

Bases: object

add_cols(tb_name, cols_info, db_name=None)
cancel_split_table(tb_name, db_name=None)
change_cols_info(tb_name, cols_info, db_name=None)
clear_data(tb_name, db_name=None)
copy()
create_database(db_name)
create_table(tb_name, cols_info, idcols=None, db_name=None, force=False)
df_to_sql(df, tb_name, act_type='replace', db_name=None, cols=None, idcols=None, col_types={}, na_val=None, inf_val='inf', _inf_val='-inf', logger=None, **kwargs_cols)
df_to_sql_by_row(df, tb_name, act_type='insert', db_name=None, cols=None, idcols=None, col_types={}, na_val=None, inf_val='inf', _inf_val='-inf', logger=None, **kwargs_cols)
drop_cols(tb_name, cols, db_name=None)
drop_data_by_where_str(tb_name, where_str, db_name=None)
drop_database(db_name)
drop_index(tb_name, index_name, db_name=None)
drop_primary_key(tb_name, db_name=None)
drop_table(tb_name, db_name=None)
drop_table_split(tb_name, part_names=None, db_name=None)
execute_sql(sql_str, db_name=None, to_df=True)
get_data(tb_name, cols=None, where_str=None, db_name=None, nlimit=None)
get_data_file_dir()
get_db_info(db_names, tb_names=False)
get_fields(tb_name, db_name=None)
get_id_fields(tb_name, db_name=None)
get_now_database()
get_primary_keys(tb_name, db_name=None)
get_split_tabel_info(tb_name=None, db_name=None)
get_uni_indexs(tb_name, db_name=None)
get_unique_values(tb_name, field, db_name=None)
has_table(tb_name, db_name=None)
modify_cols_type(tb_name, cols_info, db_name=None)
reset_db(db_name)
reset_db_ori()
select_db(db_name)
set_primary_key(tb_name, cols_key, db_name=None)
set_uni_index(tb_name, cols_uni, index_name=None, db_name=None)
show_tables(db_name=None)

get_conn

dramkit.sqltools.py_mysql.get_conn(host='localhost', user='root', password=None, database=None, port=3306, **kwargs)

连接数据库

execute_sql

dramkit.sqltools.py_mysql.execute_sql(conn, sql_str, db_name=None, to_df=True)

执行sql语句并返回结果

has_table

dramkit.sqltools.py_mysql.has_table(conn, tb_name, db_name=None)

get_fields

dramkit.sqltools.py_mysql.get_fields(conn, tb_name, db_name=None)

获取表字段名列表

get_unique_values

dramkit.sqltools.py_mysql.get_unique_values(conn, tb_name, field, db_name=None)

获取某个字段的不重复值

show_tables

dramkit.sqltools.py_mysql.show_tables(conn, db_name=None)

查看已有表名

get_now_database

dramkit.sqltools.py_mysql.get_now_database(conn)

查询当前选择的数据库

get_data_file_dir

dramkit.sqltools.py_mysql.get_data_file_dir(conn)

查看数据库文件所在位置

get_split_tabel_info

dramkit.sqltools.py_mysql.get_split_tabel_info(conn, tb_name=None, db_name=None)

查询表分区信息

drop_table_split

dramkit.sqltools.py_mysql.drop_table_split(conn, tb_name, part_names=None, db_name=None)

删除表分区

cancel_split_table

dramkit.sqltools.py_mysql.cancel_split_table(conn, tb_name, db_name=None)

取消表分区

get_primary_keys

dramkit.sqltools.py_mysql.get_primary_keys(conn, tb_name, db_name=None)

获取主键列名

get_uni_indexs

dramkit.sqltools.py_mysql.get_uni_indexs(conn, tb_name, db_name=None)

获取唯一索引列名

get_id_fields

dramkit.sqltools.py_mysql.get_id_fields(conn, tb_name, db_name=None)

获取表中的唯一值字段列表

set_primary_key

dramkit.sqltools.py_mysql.set_primary_key(conn, tb_name, cols_key, db_name=None)

主键设置

set_uni_index

dramkit.sqltools.py_mysql.set_uni_index(conn, tb_name, cols_uni, db_name=None, index_name=None)

唯一值索引设置

drop_index

dramkit.sqltools.py_mysql.drop_index(conn, tb_name, index_name, db_name=None)

删除索引

drop_primary_key

dramkit.sqltools.py_mysql.drop_primary_key(conn, tb_name, db_name=None)

删除主键

create_database

dramkit.sqltools.py_mysql.create_database(conn, db_name)

新建数据库

copy_table

dramkit.sqltools.py_mysql.copy_table(conn, tb_name, db_name=None, with_data=False)

复制表

create_table

dramkit.sqltools.py_mysql.create_table(conn, tb_name, cols_info, idcols=None, db_name=None, force=False)

新建表

Examples

>>> conn = get_conn(password='xxxxxxxxxxx')
>>> idcols = None
>>> idcols=['a', 'c']
>>> create_table(conn, 'test2',
...              ('a VARCHAR(255)',
...               'b DOUBLE NOT NULL DEFAULT 1',
...               'c DATETIME'),
...              idcols=idcols,
...              db_name='test', force=True)

drop_database

dramkit.sqltools.py_mysql.drop_database(conn, db_name)

删除数据库

drop_table

dramkit.sqltools.py_mysql.drop_table(conn, tb_name, db_name=None)

删除表

drop_cols

dramkit.sqltools.py_mysql.drop_cols(conn, tb_name, cols, db_name=None)

删除字段

clear_data

dramkit.sqltools.py_mysql.clear_data(conn, tb_name, db_name=None)

清空数据

get_db_info

dramkit.sqltools.py_mysql.get_db_info(conn, db_names, tb_names=False)
获取数据库信息
tb_names若为False,则结果只统计数据库,不包含单表;
若为None,则包含所有单表;也可以为str或list,指定单个或多个表

get_data

dramkit.sqltools.py_mysql.get_data(conn, tb_name, cols=None, where_str=None, db_name=None, nlimit=None)

获取数据

Examples

>>> conn = get_conn(password='xxxxxxxxxxx')
>>> get_data(conn, 'test1', cols=None,
...          where_str='year = "2012"',
...          db_name='test')
>>> get_data(conn, 'test1', cols='value',
...          where_str='value2 IS NULL',
...          db_name='test')

get_data_tables

dramkit.sqltools.py_mysql.get_data_tables(conn, tb_cols_dict, join_cols, db_name=None)

联表查数据

drop_data_by_where_str

dramkit.sqltools.py_mysql.drop_data_by_where_str(conn, tb_name, where_str, db_name=None)

删除数据,where_str为where条件语句

Examples

>>> conn = get_conn(password='xxxxxxxxxxx')
>>> drop_data_by_where_str(conn, 'test1',
...                        'year = "2012"',
...                        db_name='test')
>>> drop_data_by_where_str(conn, 'test1',
...                        'value2 IS NULL',
...                        db_name='test')

add_cols

dramkit.sqltools.py_mysql.add_cols(conn, tb_name, cols_info, db_name=None)

新增字段

Todo

在指定位置处插入新的列?

Examples

>>> conn = get_conn(password='xxxxxxxxxxx')
>>> add_cols(conn, 'test1',
...          ('a VARCHAR(255)',
...           'b DOUBLE NOT NULL DEFAULT 1',
...           'c DATETIME'),
...          db_name='test')

modify_cols_type

dramkit.sqltools.py_mysql.modify_cols_type(conn, tb_name, cols_info, db_name=None)

更改字段属性

Examples

>>> conn = get_conn(password='xxxxxxxxxxx')
>>> modify_cols_type(conn, 'test1',
...                  ('code VARCHAR(21)',
...                   'year VARCHAR(20) DEFAULT "XX"'),
...                  db_name='test')

change_cols_info

dramkit.sqltools.py_mysql.change_cols_info(conn, tb_name, cols_info, db_name=None)

修改字段信息,如重命名,修改字段类型等

get_cols_info_df

dramkit.sqltools.py_mysql.get_cols_info_df(df, cols=None, col_types={}, all2str=False, big_text_cols=[])

根据pd.DataFrame中的列cols,识别对应列在MySQL中的字段类别

Parameters:
  • df (pandas.DataFrame) – 待识别数据

  • cols (list, None) – 待识别列名列表,默认所有列

  • col_types (dict) – 指定列类型,如{‘col1’: ‘VARCHAR(20)’, ‘col2’: ‘BIGINT’},指定的列不做判断,直接返回

  • all2str (bool) – 若为True,则数据均按文本类型处理

  • big_text_cols (str, list) – 文本是否为长文本,若为’all’,则全部按长文本处理,若为list,则list指定列按长文本处理

Returns:

  • cols_info (str) – 列类型信息,格式如’col1 col1_type, col2 col2_type, …’

  • dtype (dict) – 字典格式的类类型信息

  • placeholder (str) – 占位符信息,格式如’%s, %s, …’

References

https://blog.csdn.net/tonydz0523/article/details/82529941

df_to_sql_insert_values

dramkit.sqltools.py_mysql.df_to_sql_insert_values(df, cols=None)

df转化为mysql插入的VALUES格式

Examples

>>> DB = PyMySQL(host='localhost',
...              user='root',
...              password='xxxxxxxxxxx',
...              database='test',
...              port=3306,
...              logger=None)
>>> import numpy as np
>>> df4 = pd.DataFrame({'id1': [1, 2, 3, 4, 5],
...                     'id2': [2, 3, 4, 5, 6],
...                     'col1': ['a', 'b', 'c', 'd', 'e'],
...                     'col2': [2, 4, 6, 8, 10]})
>>> DB.df_to_sql(df4,'test3', 'new',
...              db_name='test', idcols=['id1', 'id2'])
>>> df = pd.DataFrame({'id1': [6, 7, 8, 9, 10],
...                    'id2': [7, 8, 9, 10, 11],
...                    'col1': ['f', 'g', 'h', 'i', 'j'],
...                    'col2': [3, 6, 9, 12, 15]})
>>> values = df_to_sql_insert_values(df)
>>> DB.execute_sql('INSERT INTO test3 VALUES {}'.format(values))

df_to_sql

dramkit.sqltools.py_mysql.df_to_sql(df, conn, tb_name, act_type='replace', db_name=None, cols=None, idcols=None, col_types={}, na_val=None, inf_val='inf', _inf_val='-inf', logger=None, **kwargs_cols)

把pandas.DataFrame存入MySQL数据库中

Parameters:
  • df (pandas.DataFrame) – 待存数据

  • conn (pymysql.connect) – pymysql.connect数据库连接对象

  • tb_name (str) – 存入的表名

  • act_type (str) –

    存入方式:
    若为’ignore_tb’,则当表已经存在时不进行任何操作
    若为’new’,则新建表(若原表已存在,则会先删除再重建)
    若为’clear’,则先清空原表数据(若原表已存在)
    若为’insert’,则直接插入
    若为’replace’,则将已存在的数据更新,不存在的行和列都新插入
    若为’insert_ignore’,则已有的行不更新,不存在的行新插入
    若为’insert_newcols’(谨慎使用),则对已存在的列同直接insert(但是遇到已存在的行时不会报错,可能会丢失数据),有新列则插入新列内容
    若为’insert_ignore_newcols’,则对已存在的列同直接insert_ignore,有新列则插入新列内容

  • cols (None, list) – 需要存入的数据列

  • db_name (str) – 存入的数据库名

  • na_val (None, bool, str) – 设置df中na值的填充值

  • idcols (None, str, list) – 设置唯一值标识字段,只有在表不存在需要新建的时候才起作用

  • col_types (dict) – 指定字段类型

Examples

>>> conn = get_conn(password='xxxxxxxxxxx')
>>> db_name = 'test'
>>> tb_name = 'test1'
>>> tb_name2 = 'test2'
>>> df = pd.DataFrame({'code': ['001', '002', '003'],
...                    'year': ['2011', '2012', '2013'],
...                    'value': [1, 2, 3]})
>>> df_to_sql(df, conn, tb_name, act_type='new', db_name=db_name)
>>> df_to_sql(df.rename(columns={'value': 'VALUE'}), conn, tb_name, act_type='replace', db_name=db_name)
>>> df_to_sql(df, conn, tb_name2, act_type='new',
              db_name=db_name, idcols=['code', 'year'])
>>> df2 = pd.DataFrame({'code': ['002', '003', '005'],
...                     'value': [2, 4, 5],
...                     'year': ['2012', '2014', '2015'],
...                     'value2': [3, 5, 6]})
>>> df_to_sql(df2, conn, tb_name, act_type='insert',
...           db_name=db_name, cols='value2')
>>> df_to_sql(df2, conn, tb_name, act_type='replace',
...           db_name=db_name, cols='value2')
>>> df_to_sql(df, conn, tb_name, act_type='new', db_name=db_name)
>>> set_primary_key(conn, tb_name, 'code', db_name=db_name)
>>> df_to_sql(df2, conn, tb_name, act_type='replace', db_name=db_name)
>>> df_to_sql(df2, conn, tb_name, act_type='insert', db_name=db_name)
>>> df_to_sql(df2, conn, tb_name, act_type='insert_ignore', db_name=db_name)
>>> df_to_sql(df2, conn, tb_name, act_type='insert_ignore', cols='value2', db_name=db_name)
>>> df_to_sql(df2, conn, tb_name, act_type='insert_ignore', cols=['code', 'value2'], db_name=db_name)
>>> df_to_sql(df2, conn, tb_name, act_type='replace', cols=['code', 'value2'], db_name=db_name)
>>> df_to_sql(df2, conn, tb_name, act_type='replace', cols=['code', 'year', 'value2'], db_name=db_name)
>>> modify_cols_type(conn, 'test1', ('code VARCHAR(20)', 'year VARCHAR(10) DEFAULT "XX"', ), db_name)
>>> df3 = pd.DataFrame({'code': ['006', '007', '008'],
...                     'value': [6, 7, 8],
...                     'value2': [7, 8, 9]})
>>> df_to_sql(df3, conn, tb_name, act_type='replace', db_name=db_name)
...
>>> import numpy as np
>>> conn = get_conn(password='xxxxxxxxxxx')
>>> df4 = pd.DataFrame({'id1': [1, 2, 3, 4, 5],
...                     'id2': [2, 3, 4, 5, 6],
...                     'col1': ['a', 'b', 'c', 'd', 'e'],
...                     'col2': [2, 4, 6, 8, 10]})
>>> df5 = pd.DataFrame({'id1': [3, 4, 5, 6, 7],
...                     'id2': [4, 5, 6, 7, 8],
...                     'col1': ['c', 'ddd', np.nan, 'f', 'g'],
...                     'col3': [6, 8, 10, 12, 14]})
>>> df_to_sql(df4, conn, 'test3', act_type='new',
...           db_name='test', idcols=['id1', 'id2'])
>>> df_to_sql(df5, conn, 'test3',
...           act_type='insert',
...           db_name='test', idcols=['id1', 'id2'])
>>> # 测试2
>>> conn = get_conn(password='xxxxxxxxxxx', database='test')
>>> tb_name = 'test6'
>>> # idcols = None
>>> idcols = ['code', 'year']
>>> df0 = pd.DataFrame({'code': ['001', '002', '003', '004', '005', '006'],
...                     'year': ['2011', '2012', '2013', '2014', '2015', '2016'],
...                     'value': [1, 2, 3, 4, 5, 6],
...                     'value1': [1, 2, '3', 4, 5, 6],
...                     'value0': ['1a', '2b', '3c', '4d', '5e', '6f']})
>>> df = pd.DataFrame({'code': ['001', '002', '003', '004', '005', '006'],
...                    'year': ['2011', '2012', '2013', '2014', '2015', '2016'],
...                    'value': [1, 2, 3, np.nan, np.inf, -np.inf],
...                    'value0': ['1a', '2b', '3c', '4d', '5e', '6f']})
>>> df1 = pd.DataFrame({'code': ['006', '007', '008', '009'],
...                     'year': ['2016', '2017', '2018', '2019'],
...                     'value': [66, 7, 8, 9],
...                     'VALUE2': [10, 11, 12, 13],
...                     'VALUE3': ['10a', '11b', 12, '13']})
>>> df_to_sql(df, conn, tb_name, act_type='new', idcols=idcols)
>>> df_to_sql(df1, conn, tb_name, act_type='replace', idcols=idcols)

References

df_to_sql_by_row

dramkit.sqltools.py_mysql.df_to_sql_by_row(df, conn, tb_name, act_type='insert', db_name=None, cols=None, idcols=None, col_types={}, na_val=None, inf_val='inf', _inf_val='-inf', logger=None, **kwargs_cols)

把pandas.DataFrame存入MySQL数据库中(不考虑列的新增或缺省)

参数见 df_to_sql()

Note

判断数据是否存在时是根据主键或唯一索引来的, 因此当待插入数据字段只是已存在数据字段的一部分时, 此函数应慎用’replace’(可能导致原有数据变成Null)

gen_simple_proc_sql_str

dramkit.sqltools.py_mysql.gen_simple_proc_sql_str(sql_list, proc_name=None, with_call=True, delimiter=False)

生成存储过程mysql语句

Note

  • DELIMITER语句在pymysql中执行会报错,不知道为啥?故若要在pymysql中执行生成的sql语句,delimiter应设置为False

  • ROLLBACK只对DML语句(INSERT、DELETE、UPDATE,SELECT除外)起作用

  • (DDL语句(Data Definition Language):CREATE、ALTER、DROP)

  • (DML(Data Manipulation Language):INSERT、DELETE、UPDATE、SELECT)

  • (DCL语句(Data Control Language):GRANT、ROLLBACK、COMMIT)

Examples

>>> DB = PyMySQL(host='localhost',
...              user='root',
...              password='xxxxxxxxxxx',
...              database=None,
...              port=3306,
...              logger=None)
>>> import numpy as np
>>> df4 = pd.DataFrame({'id1': [1, 2, 3, 4, 5],
...                     'id2': [2, 3, 4, 5, 6],
...                     'col1': ['a', 'b', 'c', 'd', 'e'],
...                     'col2': [2, 4, 6, 8, 10]})
>>> DB.df_to_sql(df4,'test3', 'new',
...              db_name='test', idcols=['id1', 'id2'])
>>> delimiter = False
>>> sql_list = ['ALTER TABLE test3 ADD col3 VARCHAR(10);',
...             'INSERT INTO test3 VALUES (8, 9, 10, 11, 12);']
>>> sql = gen_simple_proc_sql_str(sql_list, delimiter=delimiter)
>>> # 执行产生的sql,test3表会新增一列和一行
>>> sql_list = ['ALTER TABLE test3 ADD col4 VARCHAR(10);',
...             'INSERT INTO test3 VALUES (8, 9, 10, 11, 12, 13);']
>>> sql = gen_simple_proc_sql_str(sql_list, delimiter=delimiter)
>>> # 执行产生的sql,test3表增加一列(ROLLBACK对增加字段操作无效)
>>> sql_list = ['INSERT INTO test3 VALUES (9, 10, 11, 12, 14, 14);',
...             'INSERT INTO test3 VALUES (8, 9, 10, 11, 12, 13);']
>>> sql = gen_simple_proc_sql_str(sql_list, delimiter=delimiter)
>>> # 执行产生的sql,test3表不会变
>>> sql_list = ['INSERT INTO test3 VALUES (9, 10, 11, 12, 14, 14);',
...             'INSERT INTO test3 VALUES (10, 11, 12, 13, 14, 15);']
>>> sql = gen_simple_proc_sql_str(sql_list, delimiter=delimiter)
>>> # 执行产生的sql,test3表会增加两行

References

sql_alchemy

SQLAlchemy

class dramkit.sqltools.sql_alchemy.SQLAlchemy(dialect='mysql', driver='pymysql', username='root', password=None, host='localhost', port=3306, database=None, orcl_pdb=False, **kwargs)

Bases: object

copy()
df_to_oracle(df, tb_name, act_type='replace', idcols=None, col_types={}, **kwargs_cols)

df存入Oracle

Examples

>>> db = SQLAlchemy(dialect='oracle', driver='cx_oracle',
...                 username='test', password='xxxxxxxxxxx',
...                 host='localhost', port=1521,
...                 orcl_pdb=True, database='orclpdb')
>>> tb_name = 'test3'
>>> # idcols = None
>>> idcols = ['code', 'year']
>>> df = pd.DataFrame({'code': ['001', '002', '003', '004', '005', '006'],
...                    'year': ['2011', '2012', '2013', '2014', '2015', '2016'],
...                    'value': [1, 2, 3, 4, 5, 6],
...                    # 'value1': [1, 2, '3', 4, 5, 6],
...                    'value0': ['1a', '2b', '3c', '4d', '5e', '6f']})
>>> df1 = pd.DataFrame({'code': ['006', '007', '008', '009'],
...                     'year': ['2016', '2017', '2018', '2019'],
...                     'value': [66, 7, 8, 9],
...                     'VALUE2': [10, 11, 12, 13],
...                     # 'VALUE3': ['10a', '11b', '12', '13']
...                    })
>>> db.df_to_sql(df, tb_name, act_type='new', idcols=idcols)
>>> db.df_to_sql(df1, tb_name, act_type='replace', idcols=idcols)
df_to_sql(*args, **kwargs)
drop_table(tb_name, purge=True)

删除表

get_fields(tb_name)

获取表字段名列表

get_tables()
has_table(tb_name)
oracle_merge_into(tb_tgt, tb_src, cols, idcols, rep_keep='src')

get_conn

dramkit.sqltools.sql_alchemy.get_conn(dialect='mysql', driver='pymysql', username='root', password=None, host='localhost', port=3306, database=None, orcl_pdb=False, **kwargs)

TODO: oracle连接参数有service_name的处理?

get_cols_info_df_oracle

dramkit.sqltools.sql_alchemy.get_cols_info_df_oracle(df, cols=None, col_types={}, all2str=False, big_text_cols=[])

根据pd.DataFrame中的列cols,识别对应列在Oracle中的字段类别

Parameters:
  • df (pandas.DataFrame) – 待识别数据

  • cols (list, None) – 待识别列名列表,默认所有列

  • col_types (dict) – 指定列类型,如{‘col1’: ‘NVARCHAR(20)’, ‘col2’: ‘INT’},指定的列不做判断,直接返回

  • all2str (bool) – 若为True,则数据均按文本类型处理

  • big_text_cols (str, list) – 文本是否为长文本,若为’all’,则全部按长文本处理,若为list,则list指定列按长文本处理

Returns:

  • cols_info (str) – 列类型信息,格式如’col1 col1_type, col2 col2_type, …’

  • dtype (dict) – 字典格式的类类型信息

  • placeholder (str) – 占位符信息,格式如’:1, :2, …’

References

https://blog.csdn.net/tonydz0523/article/details/82529941