sqltools
cxoracle
CxOracle
- class dramkit.sqltools.cxoracle.CxOracle(host='localhost', user='test', password='xxxxxxxxxxx', database='orclpdb', port=1521, logger=None, **kwargs)
Bases:
object
- clear_data(tb_name)
- copy()
- create_table(tb_name, cols_info, idcols=None, force=False)
- df_to_sql(df, tb_name, act_type='insert', cols=None, idcols=None, col_types={}, na_val=None, inf_val='inf', _inf_val='-inf', **kwargs_cols)
- drop_table(tb_name, purge=True)
- execute_sql(sql_str, to_df=True)
- get_create_table_sql(tb_name)
- get_fields(tb_name)
- get_tables()
- has_table(tb_name)
get_conn
- dramkit.sqltools.cxoracle.get_conn(host='localhost', user='test', password=None, database='orclpdb', port=1521, **kwargs)
连接数据库
Examples
>>> host, user = 'localhost', 'test', >>> database, port = 'orclpdb', 1521 >>> password = 'xxxxxxxxxxx' >>> con1 = get_conn(host=host, user=user, ... password=password, ... database=database, port=port) >>> host, user = 'localhost', 'c##test', >>> database, port = 'orcl', 1521 >>> password = 'xxxxxxxxxxx' >>> con2 = get_conn(host=host, user=user, ... password=password, ... database=database, port=port)
execute_sql
- dramkit.sqltools.cxoracle.execute_sql(conn, sql_str, to_df=True)
执行sql语句并返回结果
Examples
>>> conn = _get_test_conn() >>> df1 = execute_sql(conn, 'select * from test1') >>> # execute_sql(conn, 'drop table test2 purge') >>> execute_sql(conn, ... """create table test2 ... (code varchar(20), year int, val1 float)""") >>> df2 = execute_sql(conn, 'select * from test2') >>> execute_sql(conn, """insert into test2 values ('a', 2022, 1)""") >>> df2 = execute_sql(conn, 'select * from test2')
get_fields
- dramkit.sqltools.cxoracle.get_fields(conn, tb_name)
获取表字段名列表
clear_data
- dramkit.sqltools.cxoracle.clear_data(conn, tb_name)
清空表中数据
has_table
- dramkit.sqltools.cxoracle.has_table(conn, tb_name)
copy_table
- dramkit.sqltools.cxoracle.copy_table(conn, tb_name, with_data=False)
复制表
create_table
- dramkit.sqltools.cxoracle.create_table(conn, tb_name, cols_info, idcols=None, force=False)
新建表
Examples
>>> conn = get_conn(password='xxxxxxxxxxx') >>> idcols = None >>> idcols = ['a', 'c'] >>> create_table(conn, 'test2', ... ('a VARCHAR2(255)', ... 'b FLOAT DEFAULT 1 NOT NULL', ... 'c DATE'), ... idcols=idcols, force=True)
get_create_table_sql
- dramkit.sqltools.cxoracle.get_create_table_sql(conn, tb_name)
查询表创建语句
drop_table
- dramkit.sqltools.cxoracle.drop_table(conn, tb_name, purge=True)
删除表
get_tables
- dramkit.sqltools.cxoracle.get_tables(conn)
get_cols_info_df
- dramkit.sqltools.cxoracle.get_cols_info_df(df, cols=None, col_types={}, all2str=False, big_text_cols=[])
根据pd.DataFrame中的列cols,识别对应列在Oracle中的字段类别
- Parameters:
df (pandas.DataFrame) – 待识别数据
cols (list, None) – 待识别列名列表,默认所有列
col_types (dict) – 指定列类型,如{‘col1’: ‘VARCHAR2(20)’, ‘col2’: ‘NUMBER(10,0)’},指定的列不做判断,直接返回
all2str (bool) – 若为True,则数据均按文本类型处理
big_text_cols (str, list) – 文本是否为长文本,若为’all’,则全部按长文本处理,若为list,则list指定列按长文本处理
- Returns:
cols_info (str) – 列类型信息,格式如’col1 col1_type, col2 col2_type, …’
dtype (dict) – 字典格式的类类型信息
placeholder (str) – 占位符信息,格式如’:1, :2, …’
References
merge_into
- dramkit.sqltools.cxoracle.merge_into(conn, tb_tgt, tb_src, cols, idcols, rep_keep='src')
df_to_sql
- dramkit.sqltools.cxoracle.df_to_sql(df, conn, tb_name, act_type='insert', cols=None, idcols=None, col_types={}, na_val=None, inf_val='inf', _inf_val='-inf', **kwargs_cols)
把pandas.DataFrame存入Oracle数据库中
- Parameters:
df (pandas.DataFrame) – 待存数据
conn (cx_oracle.connect) – cx_oracle.connect数据库连接对象
tb_name (str) – 存入的表名
act_type (str) –
存入方式:若为’ignore_tb’,则当表已经存在时不进行任何操作若为’new’,则新建表(若原表已存在,则会先删除再重建)若为’clear’,则先清空已存在数据(若原表已存在)若为’insert’,则直接插入若为’replace’,则将已存在的数据更新,不存在的行和列都新插入若为’insert_ignore’,则已有的行不更新,不存在的行新插入若为’insert_newcols’(谨慎使用),则对已存在的列同直接insert(但是遇到已存在的行时不会报错,可能会丢失数据),有新列则插入新列内容若为’insert_ignore_newcols’,则对已存在的列同直接insert_ignore,有新列则插入新列内容cols (None, list) – 需要存入的数据列
db_name (str) – 存入的数据库名
na_val (None, bool, str) – 设置df中na值的填充值
idcols (None, str, list) – 设置唯一值标识字段,只有在表不存在需要新建的时候才起作用
col_types (dict) – 指定字段类型
Examples
>>> conn = get_conn(password='xxxxxxxxxxx') >>> tb_name = 'test1' >>> # idcols = None >>> idcols = ['code', 'year'] >>> df0 = pd.DataFrame({'code': ['001', '002', '003', '004', '005', '006'], ... 'year': ['2011', '2012', '2013', '2014', '2015', '2016'], ... 'value': [1, 2, 3, 4, 5, 6], ... 'value1': [1, 2, '3', 4, 5, 6], ... 'value0': ['1a', '2b', '3c', '4d', '5e', '6f']}) >>> df = pd.DataFrame({'code': ['001', '002', '003', '004', '005', '006'], ... 'year': ['2011', '2012', '2013', '2014', '2015', '2016'], ... 'value': [1, 2, 3, np.nan, np.inf, -np.inf], ... 'value0': ['1a', '2b', '3c', '4d', '5e', '6f']}) >>> df1 = pd.DataFrame({'code': ['006', '007', '008', '009'], ... 'year': ['2016', '2017', '2018', '2019'], ... 'value': [66, 7, 8, 9], ... 'VALUE2': [10, 11, 12, 13], ... 'VALUE3': ['10a', '11b', 12, '13']}) >>> df_to_sql(df, conn, tb_name, act_type='new', idcols=idcols) >>> df_to_sql(df1, conn, tb_name, act_type='replace', idcols=idcols)
py_hive
PyHive
- class dramkit.sqltools.py_hive.PyHive(host, username, password=None, database=None, port=10000, driver='pyhive', **kwargs)
Bases:
object
- copy()
- create_table(tb_name, cols_info, idcols=None, db_name=None, force=False)
新建表
Examples
>>> db = PyHive(host='192.168.118.128', port=10000, ... username='root', database='test') >>> idcols = None >>> idcols = ['a', 'c'] >>> db.create_table('test15', ... ('a VARCHAR(255)', ... 'b FLOAT NOT NULL', ... 'c DATE'), ... idcols=idcols, db_name='test1', ... force=True)
- df_to_sql(df, tb_name, act_type='replace', db_name=None, cols=None, idcols=None, col_types={}, na_val=None, inf_val='inf', _inf_val='-inf', **kwargs_cols)
把pandas.DataFrame存入Hive数据库中
Examples
>>> db = PyHive(host='192.168.118.128', port=10000, ... username='root', database='test') >>> db = PyHive(host='192.168.118.128', port=10000, ... username='root', database='test', ... password='123456', driver='impala', ... auth_mechanism='PLAIN') >>> tb_name = 'test' >>> # idcols = None >>> idcols = ['code', 'year'] >>> df0 = pd.DataFrame({'code': ['001', '002', '003', '004', '005', '006'], ... 'year': ['2011', '2012', '2013', '2014', '2015', '2016'], ... 'value': [1, 2, 3, 4, 5, 6], ... 'value1': [1, 2, '3', 4, 5, 6], ... 'value0': ['1a', '2b', '3c', '4d', '5e', '6f']}) >>> df = pd.DataFrame({'code': ['001', '002', '003', '004', '005', '006'], ... 'year': ['2011', '2012', '2013', '2014', '2015', '2016'], ... 'value': [1, 2, 3, np.nan, np.inf, -np.inf], ... 'value0': ['1a', '2b', '3c', '4d', '5e', '6f']}) >>> df1 = pd.DataFrame({'code': ['006', '007', '008', '009'], ... 'year': ['2016', '2017', '2018', '2019'], ... 'value': [66, 7, 8, 9], ... 'VALUE2': [10, 11, 12, 13], ... 'VALUE3': ['10a', '11b', 12, '13']}) >>> db.df_to_sql(df, tb_name, act_type='new', idcols=idcols) >>> db.df_to_sql(df1, tb_name, act_type='insert_ignore_newcols', idcols=idcols)
- drop_cols(tb_name, cols, db_name=None)
删除列
Notes
Hive中,如果表是orc格式的,不支持删除,会报错,如果是textfile格式,则可以删除。
- drop_table(tb_name, db_name=None, purge=True)
删除表
- execute_sql(sql_str, db_name=None, to_df=True)
- get_conn(*args, **kwargs)
- get_fields(tb_name, db_name=None)
获取表字段名列表
- get_version()
- has_table(tb_name, db_name=None)
- merge_into(*args, **kwargs)
- merge_into1(tb_tgt, tb_src, replace_cols, idcols, rep_keep='src', db_name=None)
- merge_into2(tb_tgt, tb_src, replace_cols, idcols, rep_keep='src', db_name=None)
- now_database()
- show_tables(db_name=None)
get_conn_pyhive
- dramkit.sqltools.py_hive.get_conn_pyhive(host, user, password=None, database='default', port=10000, auth=None, **kwargs)
连接数据库
get_conn_impala
- dramkit.sqltools.py_hive.get_conn_impala(host, user, password=None, port=10000, database='default', auth_mechanism=None, **kwargs)
execute_sql
- dramkit.sqltools.py_hive.execute_sql(conn, sql_str, db_name=None, to_df=True)
执行sql语句并返回结果
py_mysql
PyMySQL
- class dramkit.sqltools.py_mysql.PyMySQL(host='localhost', user='root', password=None, database=None, port=3306, logger=None, **kwargs)
Bases:
object
- add_cols(tb_name, cols_info, db_name=None)
- cancel_split_table(tb_name, db_name=None)
- change_cols_info(tb_name, cols_info, db_name=None)
- clear_data(tb_name, db_name=None)
- copy()
- create_database(db_name)
- create_table(tb_name, cols_info, idcols=None, db_name=None, force=False)
- df_to_sql(df, tb_name, act_type='replace', db_name=None, cols=None, idcols=None, col_types={}, na_val=None, inf_val='inf', _inf_val='-inf', logger=None, **kwargs_cols)
- df_to_sql_by_row(df, tb_name, act_type='insert', db_name=None, cols=None, idcols=None, col_types={}, na_val=None, inf_val='inf', _inf_val='-inf', logger=None, **kwargs_cols)
- drop_cols(tb_name, cols, db_name=None)
- drop_data_by_where_str(tb_name, where_str, db_name=None)
- drop_database(db_name)
- drop_index(tb_name, index_name, db_name=None)
- drop_primary_key(tb_name, db_name=None)
- drop_table(tb_name, db_name=None)
- drop_table_split(tb_name, part_names=None, db_name=None)
- execute_sql(sql_str, db_name=None, to_df=True)
- get_data(tb_name, cols=None, where_str=None, db_name=None, nlimit=None)
- get_data_file_dir()
- get_db_info(db_names, tb_names=False)
- get_fields(tb_name, db_name=None)
- get_id_fields(tb_name, db_name=None)
- get_now_database()
- get_primary_keys(tb_name, db_name=None)
- get_split_tabel_info(tb_name=None, db_name=None)
- get_uni_indexs(tb_name, db_name=None)
- get_unique_values(tb_name, field, db_name=None)
- has_table(tb_name, db_name=None)
- modify_cols_type(tb_name, cols_info, db_name=None)
- reset_db(db_name)
- reset_db_ori()
- select_db(db_name)
- set_primary_key(tb_name, cols_key, db_name=None)
- set_uni_index(tb_name, cols_uni, index_name=None, db_name=None)
- show_tables(db_name=None)
get_conn
- dramkit.sqltools.py_mysql.get_conn(host='localhost', user='root', password=None, database=None, port=3306, **kwargs)
连接数据库
execute_sql
- dramkit.sqltools.py_mysql.execute_sql(conn, sql_str, db_name=None, to_df=True)
执行sql语句并返回结果
has_table
- dramkit.sqltools.py_mysql.has_table(conn, tb_name, db_name=None)
get_fields
- dramkit.sqltools.py_mysql.get_fields(conn, tb_name, db_name=None)
获取表字段名列表
get_unique_values
- dramkit.sqltools.py_mysql.get_unique_values(conn, tb_name, field, db_name=None)
获取某个字段的不重复值
show_tables
- dramkit.sqltools.py_mysql.show_tables(conn, db_name=None)
查看已有表名
get_now_database
- dramkit.sqltools.py_mysql.get_now_database(conn)
查询当前选择的数据库
get_data_file_dir
- dramkit.sqltools.py_mysql.get_data_file_dir(conn)
查看数据库文件所在位置
get_split_tabel_info
- dramkit.sqltools.py_mysql.get_split_tabel_info(conn, tb_name=None, db_name=None)
查询表分区信息
drop_table_split
- dramkit.sqltools.py_mysql.drop_table_split(conn, tb_name, part_names=None, db_name=None)
删除表分区
cancel_split_table
- dramkit.sqltools.py_mysql.cancel_split_table(conn, tb_name, db_name=None)
取消表分区
get_primary_keys
- dramkit.sqltools.py_mysql.get_primary_keys(conn, tb_name, db_name=None)
获取主键列名
get_uni_indexs
- dramkit.sqltools.py_mysql.get_uni_indexs(conn, tb_name, db_name=None)
获取唯一索引列名
get_id_fields
- dramkit.sqltools.py_mysql.get_id_fields(conn, tb_name, db_name=None)
获取表中的唯一值字段列表
set_primary_key
- dramkit.sqltools.py_mysql.set_primary_key(conn, tb_name, cols_key, db_name=None)
主键设置
set_uni_index
- dramkit.sqltools.py_mysql.set_uni_index(conn, tb_name, cols_uni, db_name=None, index_name=None)
唯一值索引设置
drop_index
- dramkit.sqltools.py_mysql.drop_index(conn, tb_name, index_name, db_name=None)
删除索引
drop_primary_key
- dramkit.sqltools.py_mysql.drop_primary_key(conn, tb_name, db_name=None)
删除主键
create_database
- dramkit.sqltools.py_mysql.create_database(conn, db_name)
新建数据库
copy_table
- dramkit.sqltools.py_mysql.copy_table(conn, tb_name, db_name=None, with_data=False)
复制表
create_table
- dramkit.sqltools.py_mysql.create_table(conn, tb_name, cols_info, idcols=None, db_name=None, force=False)
新建表
Examples
>>> conn = get_conn(password='xxxxxxxxxxx') >>> idcols = None >>> idcols=['a', 'c'] >>> create_table(conn, 'test2', ... ('a VARCHAR(255)', ... 'b DOUBLE NOT NULL DEFAULT 1', ... 'c DATETIME'), ... idcols=idcols, ... db_name='test', force=True)
drop_database
- dramkit.sqltools.py_mysql.drop_database(conn, db_name)
删除数据库
drop_table
- dramkit.sqltools.py_mysql.drop_table(conn, tb_name, db_name=None)
删除表
drop_cols
- dramkit.sqltools.py_mysql.drop_cols(conn, tb_name, cols, db_name=None)
删除字段
clear_data
- dramkit.sqltools.py_mysql.clear_data(conn, tb_name, db_name=None)
清空数据
get_db_info
- dramkit.sqltools.py_mysql.get_db_info(conn, db_names, tb_names=False)
- 获取数据库信息tb_names若为False,则结果只统计数据库,不包含单表;若为None,则包含所有单表;也可以为str或list,指定单个或多个表
get_data
- dramkit.sqltools.py_mysql.get_data(conn, tb_name, cols=None, where_str=None, db_name=None, nlimit=None)
获取数据
Examples
>>> conn = get_conn(password='xxxxxxxxxxx') >>> get_data(conn, 'test1', cols=None, ... where_str='year = "2012"', ... db_name='test') >>> get_data(conn, 'test1', cols='value', ... where_str='value2 IS NULL', ... db_name='test')
get_data_tables
- dramkit.sqltools.py_mysql.get_data_tables(conn, tb_cols_dict, join_cols, db_name=None)
联表查数据
drop_data_by_where_str
- dramkit.sqltools.py_mysql.drop_data_by_where_str(conn, tb_name, where_str, db_name=None)
删除数据,where_str为where条件语句
Examples
>>> conn = get_conn(password='xxxxxxxxxxx') >>> drop_data_by_where_str(conn, 'test1', ... 'year = "2012"', ... db_name='test') >>> drop_data_by_where_str(conn, 'test1', ... 'value2 IS NULL', ... db_name='test')
add_cols
- dramkit.sqltools.py_mysql.add_cols(conn, tb_name, cols_info, db_name=None)
新增字段
Todo
在指定位置处插入新的列?
Examples
>>> conn = get_conn(password='xxxxxxxxxxx') >>> add_cols(conn, 'test1', ... ('a VARCHAR(255)', ... 'b DOUBLE NOT NULL DEFAULT 1', ... 'c DATETIME'), ... db_name='test')
modify_cols_type
- dramkit.sqltools.py_mysql.modify_cols_type(conn, tb_name, cols_info, db_name=None)
更改字段属性
Examples
>>> conn = get_conn(password='xxxxxxxxxxx') >>> modify_cols_type(conn, 'test1', ... ('code VARCHAR(21)', ... 'year VARCHAR(20) DEFAULT "XX"'), ... db_name='test')
change_cols_info
- dramkit.sqltools.py_mysql.change_cols_info(conn, tb_name, cols_info, db_name=None)
修改字段信息,如重命名,修改字段类型等
get_cols_info_df
- dramkit.sqltools.py_mysql.get_cols_info_df(df, cols=None, col_types={}, all2str=False, big_text_cols=[])
根据pd.DataFrame中的列cols,识别对应列在MySQL中的字段类别
- Parameters:
df (pandas.DataFrame) – 待识别数据
cols (list, None) – 待识别列名列表,默认所有列
col_types (dict) – 指定列类型,如{‘col1’: ‘VARCHAR(20)’, ‘col2’: ‘BIGINT’},指定的列不做判断,直接返回
all2str (bool) – 若为True,则数据均按文本类型处理
big_text_cols (str, list) – 文本是否为长文本,若为’all’,则全部按长文本处理,若为list,则list指定列按长文本处理
- Returns:
cols_info (str) – 列类型信息,格式如’col1 col1_type, col2 col2_type, …’
dtype (dict) – 字典格式的类类型信息
placeholder (str) – 占位符信息,格式如’%s, %s, …’
References
df_to_sql_insert_values
- dramkit.sqltools.py_mysql.df_to_sql_insert_values(df, cols=None)
df转化为mysql插入的VALUES格式
Examples
>>> DB = PyMySQL(host='localhost', ... user='root', ... password='xxxxxxxxxxx', ... database='test', ... port=3306, ... logger=None) >>> import numpy as np >>> df4 = pd.DataFrame({'id1': [1, 2, 3, 4, 5], ... 'id2': [2, 3, 4, 5, 6], ... 'col1': ['a', 'b', 'c', 'd', 'e'], ... 'col2': [2, 4, 6, 8, 10]}) >>> DB.df_to_sql(df4,'test3', 'new', ... db_name='test', idcols=['id1', 'id2']) >>> df = pd.DataFrame({'id1': [6, 7, 8, 9, 10], ... 'id2': [7, 8, 9, 10, 11], ... 'col1': ['f', 'g', 'h', 'i', 'j'], ... 'col2': [3, 6, 9, 12, 15]}) >>> values = df_to_sql_insert_values(df) >>> DB.execute_sql('INSERT INTO test3 VALUES {}'.format(values))
df_to_sql
- dramkit.sqltools.py_mysql.df_to_sql(df, conn, tb_name, act_type='replace', db_name=None, cols=None, idcols=None, col_types={}, na_val=None, inf_val='inf', _inf_val='-inf', logger=None, **kwargs_cols)
把pandas.DataFrame存入MySQL数据库中
- Parameters:
df (pandas.DataFrame) – 待存数据
conn (pymysql.connect) – pymysql.connect数据库连接对象
tb_name (str) – 存入的表名
act_type (str) –
存入方式:若为’ignore_tb’,则当表已经存在时不进行任何操作若为’new’,则新建表(若原表已存在,则会先删除再重建)若为’clear’,则先清空原表数据(若原表已存在)若为’insert’,则直接插入若为’replace’,则将已存在的数据更新,不存在的行和列都新插入若为’insert_ignore’,则已有的行不更新,不存在的行新插入若为’insert_newcols’(谨慎使用),则对已存在的列同直接insert(但是遇到已存在的行时不会报错,可能会丢失数据),有新列则插入新列内容若为’insert_ignore_newcols’,则对已存在的列同直接insert_ignore,有新列则插入新列内容cols (None, list) – 需要存入的数据列
db_name (str) – 存入的数据库名
na_val (None, bool, str) – 设置df中na值的填充值
idcols (None, str, list) – 设置唯一值标识字段,只有在表不存在需要新建的时候才起作用
col_types (dict) – 指定字段类型
Examples
>>> conn = get_conn(password='xxxxxxxxxxx') >>> db_name = 'test' >>> tb_name = 'test1' >>> tb_name2 = 'test2' >>> df = pd.DataFrame({'code': ['001', '002', '003'], ... 'year': ['2011', '2012', '2013'], ... 'value': [1, 2, 3]}) >>> df_to_sql(df, conn, tb_name, act_type='new', db_name=db_name) >>> df_to_sql(df.rename(columns={'value': 'VALUE'}), conn, tb_name, act_type='replace', db_name=db_name) >>> df_to_sql(df, conn, tb_name2, act_type='new', db_name=db_name, idcols=['code', 'year']) >>> df2 = pd.DataFrame({'code': ['002', '003', '005'], ... 'value': [2, 4, 5], ... 'year': ['2012', '2014', '2015'], ... 'value2': [3, 5, 6]}) >>> df_to_sql(df2, conn, tb_name, act_type='insert', ... db_name=db_name, cols='value2') >>> df_to_sql(df2, conn, tb_name, act_type='replace', ... db_name=db_name, cols='value2') >>> df_to_sql(df, conn, tb_name, act_type='new', db_name=db_name) >>> set_primary_key(conn, tb_name, 'code', db_name=db_name) >>> df_to_sql(df2, conn, tb_name, act_type='replace', db_name=db_name) >>> df_to_sql(df2, conn, tb_name, act_type='insert', db_name=db_name) >>> df_to_sql(df2, conn, tb_name, act_type='insert_ignore', db_name=db_name) >>> df_to_sql(df2, conn, tb_name, act_type='insert_ignore', cols='value2', db_name=db_name) >>> df_to_sql(df2, conn, tb_name, act_type='insert_ignore', cols=['code', 'value2'], db_name=db_name) >>> df_to_sql(df2, conn, tb_name, act_type='replace', cols=['code', 'value2'], db_name=db_name) >>> df_to_sql(df2, conn, tb_name, act_type='replace', cols=['code', 'year', 'value2'], db_name=db_name) >>> modify_cols_type(conn, 'test1', ('code VARCHAR(20)', 'year VARCHAR(10) DEFAULT "XX"', ), db_name) >>> df3 = pd.DataFrame({'code': ['006', '007', '008'], ... 'value': [6, 7, 8], ... 'value2': [7, 8, 9]}) >>> df_to_sql(df3, conn, tb_name, act_type='replace', db_name=db_name) ... >>> import numpy as np >>> conn = get_conn(password='xxxxxxxxxxx') >>> df4 = pd.DataFrame({'id1': [1, 2, 3, 4, 5], ... 'id2': [2, 3, 4, 5, 6], ... 'col1': ['a', 'b', 'c', 'd', 'e'], ... 'col2': [2, 4, 6, 8, 10]}) >>> df5 = pd.DataFrame({'id1': [3, 4, 5, 6, 7], ... 'id2': [4, 5, 6, 7, 8], ... 'col1': ['c', 'ddd', np.nan, 'f', 'g'], ... 'col3': [6, 8, 10, 12, 14]}) >>> df_to_sql(df4, conn, 'test3', act_type='new', ... db_name='test', idcols=['id1', 'id2']) >>> df_to_sql(df5, conn, 'test3', ... act_type='insert', ... db_name='test', idcols=['id1', 'id2']) >>> # 测试2 >>> conn = get_conn(password='xxxxxxxxxxx', database='test') >>> tb_name = 'test6' >>> # idcols = None >>> idcols = ['code', 'year'] >>> df0 = pd.DataFrame({'code': ['001', '002', '003', '004', '005', '006'], ... 'year': ['2011', '2012', '2013', '2014', '2015', '2016'], ... 'value': [1, 2, 3, 4, 5, 6], ... 'value1': [1, 2, '3', 4, 5, 6], ... 'value0': ['1a', '2b', '3c', '4d', '5e', '6f']}) >>> df = pd.DataFrame({'code': ['001', '002', '003', '004', '005', '006'], ... 'year': ['2011', '2012', '2013', '2014', '2015', '2016'], ... 'value': [1, 2, 3, np.nan, np.inf, -np.inf], ... 'value0': ['1a', '2b', '3c', '4d', '5e', '6f']}) >>> df1 = pd.DataFrame({'code': ['006', '007', '008', '009'], ... 'year': ['2016', '2017', '2018', '2019'], ... 'value': [66, 7, 8, 9], ... 'VALUE2': [10, 11, 12, 13], ... 'VALUE3': ['10a', '11b', 12, '13']}) >>> df_to_sql(df, conn, tb_name, act_type='new', idcols=idcols) >>> df_to_sql(df1, conn, tb_name, act_type='replace', idcols=idcols)
References
df_to_sql_by_row
- dramkit.sqltools.py_mysql.df_to_sql_by_row(df, conn, tb_name, act_type='insert', db_name=None, cols=None, idcols=None, col_types={}, na_val=None, inf_val='inf', _inf_val='-inf', logger=None, **kwargs_cols)
把pandas.DataFrame存入MySQL数据库中(不考虑列的新增或缺省)
参数见
df_to_sql()
Note
判断数据是否存在时是根据主键或唯一索引来的, 因此当待插入数据字段只是已存在数据字段的一部分时, 此函数应慎用’replace’(可能导致原有数据变成Null)
gen_simple_proc_sql_str
- dramkit.sqltools.py_mysql.gen_simple_proc_sql_str(sql_list, proc_name=None, with_call=True, delimiter=False)
生成存储过程mysql语句
Note
DELIMITER语句在pymysql中执行会报错,不知道为啥?故若要在pymysql中执行生成的sql语句,delimiter应设置为False
ROLLBACK只对DML语句(INSERT、DELETE、UPDATE,SELECT除外)起作用
(DDL语句(Data Definition Language):CREATE、ALTER、DROP)
(DML(Data Manipulation Language):INSERT、DELETE、UPDATE、SELECT)
(DCL语句(Data Control Language):GRANT、ROLLBACK、COMMIT)
Examples
>>> DB = PyMySQL(host='localhost', ... user='root', ... password='xxxxxxxxxxx', ... database=None, ... port=3306, ... logger=None) >>> import numpy as np >>> df4 = pd.DataFrame({'id1': [1, 2, 3, 4, 5], ... 'id2': [2, 3, 4, 5, 6], ... 'col1': ['a', 'b', 'c', 'd', 'e'], ... 'col2': [2, 4, 6, 8, 10]}) >>> DB.df_to_sql(df4,'test3', 'new', ... db_name='test', idcols=['id1', 'id2']) >>> delimiter = False >>> sql_list = ['ALTER TABLE test3 ADD col3 VARCHAR(10);', ... 'INSERT INTO test3 VALUES (8, 9, 10, 11, 12);'] >>> sql = gen_simple_proc_sql_str(sql_list, delimiter=delimiter) >>> # 执行产生的sql,test3表会新增一列和一行 >>> sql_list = ['ALTER TABLE test3 ADD col4 VARCHAR(10);', ... 'INSERT INTO test3 VALUES (8, 9, 10, 11, 12, 13);'] >>> sql = gen_simple_proc_sql_str(sql_list, delimiter=delimiter) >>> # 执行产生的sql,test3表增加一列(ROLLBACK对增加字段操作无效) >>> sql_list = ['INSERT INTO test3 VALUES (9, 10, 11, 12, 14, 14);', ... 'INSERT INTO test3 VALUES (8, 9, 10, 11, 12, 13);'] >>> sql = gen_simple_proc_sql_str(sql_list, delimiter=delimiter) >>> # 执行产生的sql,test3表不会变 >>> sql_list = ['INSERT INTO test3 VALUES (9, 10, 11, 12, 14, 14);', ... 'INSERT INTO test3 VALUES (10, 11, 12, 13, 14, 15);'] >>> sql = gen_simple_proc_sql_str(sql_list, delimiter=delimiter) >>> # 执行产生的sql,test3表会增加两行
References
sql_alchemy
SQLAlchemy
- class dramkit.sqltools.sql_alchemy.SQLAlchemy(dialect='mysql', driver='pymysql', username='root', password=None, host='localhost', port=3306, database=None, orcl_pdb=False, **kwargs)
Bases:
object
- copy()
- df_to_oracle(df, tb_name, act_type='replace', idcols=None, col_types={}, **kwargs_cols)
df存入Oracle
Examples
>>> db = SQLAlchemy(dialect='oracle', driver='cx_oracle', ... username='test', password='xxxxxxxxxxx', ... host='localhost', port=1521, ... orcl_pdb=True, database='orclpdb') >>> tb_name = 'test3' >>> # idcols = None >>> idcols = ['code', 'year'] >>> df = pd.DataFrame({'code': ['001', '002', '003', '004', '005', '006'], ... 'year': ['2011', '2012', '2013', '2014', '2015', '2016'], ... 'value': [1, 2, 3, 4, 5, 6], ... # 'value1': [1, 2, '3', 4, 5, 6], ... 'value0': ['1a', '2b', '3c', '4d', '5e', '6f']}) >>> df1 = pd.DataFrame({'code': ['006', '007', '008', '009'], ... 'year': ['2016', '2017', '2018', '2019'], ... 'value': [66, 7, 8, 9], ... 'VALUE2': [10, 11, 12, 13], ... # 'VALUE3': ['10a', '11b', '12', '13'] ... }) >>> db.df_to_sql(df, tb_name, act_type='new', idcols=idcols) >>> db.df_to_sql(df1, tb_name, act_type='replace', idcols=idcols)
- df_to_sql(*args, **kwargs)
- drop_table(tb_name, purge=True)
删除表
- get_fields(tb_name)
获取表字段名列表
- get_tables()
- has_table(tb_name)
- oracle_merge_into(tb_tgt, tb_src, cols, idcols, rep_keep='src')
get_conn
- dramkit.sqltools.sql_alchemy.get_conn(dialect='mysql', driver='pymysql', username='root', password=None, host='localhost', port=3306, database=None, orcl_pdb=False, **kwargs)
TODO: oracle连接参数有service_name的处理?
get_cols_info_df_oracle
- dramkit.sqltools.sql_alchemy.get_cols_info_df_oracle(df, cols=None, col_types={}, all2str=False, big_text_cols=[])
根据pd.DataFrame中的列cols,识别对应列在Oracle中的字段类别
- Parameters:
df (pandas.DataFrame) – 待识别数据
cols (list, None) – 待识别列名列表,默认所有列
col_types (dict) – 指定列类型,如{‘col1’: ‘NVARCHAR(20)’, ‘col2’: ‘INT’},指定的列不做判断,直接返回
all2str (bool) – 若为True,则数据均按文本类型处理
big_text_cols (str, list) – 文本是否为长文本,若为’all’,则全部按长文本处理,若为list,则list指定列按长文本处理
- Returns:
cols_info (str) – 列类型信息,格式如’col1 col1_type, col2 col2_type, …’
dtype (dict) – 字典格式的类类型信息
placeholder (str) – 占位符信息,格式如’:1, :2, …’
References