dramkit
cryptotools
expand
- dramkit.cryptotools.expand(text, baselen=16, encoding='utf-8')
- 如果text不足minlen位的倍数就用空格补足注:utf-8编码中一个汉字占3个字节
en_aes_cbc
- dramkit.cryptotools.en_aes_cbc(text, key=None, iv=None, encoding='utf-8')
AES CBC加密
de_aes_cbc
- dramkit.cryptotools.de_aes_cbc(text, key=None, iv=None, encoding='utf-8')
AEC CBC解密
en_aes_ecb
- dramkit.cryptotools.en_aes_ecb(text, key=None, encoding='utf-8')
AES ECB加密
de_aes_ecb
- dramkit.cryptotools.de_aes_ecb(text, key=None, encoding='utf-8')
AES ECB加密
en_base64
- dramkit.cryptotools.en_base64(text, encoding='utf-8')
base64加密
de_base64
- dramkit.cryptotools.de_base64(text, encoding='utf-8')
base64解密
en_base64_url
- dramkit.cryptotools.en_base64_url(text, encoding='utf-8')
base64加密
de_base64_url
- dramkit.cryptotools.de_base64_url(text, encoding='utf-8')
base64解密
gen_random_token
- dramkit.cryptotools.gen_random_token(length=64, spec_chars=False, rand_seed=None)
gen_random_tokens
- dramkit.cryptotools.gen_random_tokens(n, **kwargs)
datetimetools
pd_str2datetime
- dramkit.datetimetools.pd_str2datetime(o, **kwargs)
pd.to_datetime函数封装,排除o为时间戳(统一按str处理)
datetime2str
- dramkit.datetimetools.datetime2str(dt, strformat=None, tz='local')
datatime格式转为str格式
- Parameters:
dt (datetime.datetime, datetime.date, time.struct_time, pd.Timestamp) – datetime、time或pandas格式时间
strformat (None, bool, str) –
设置返回文本格式:
False, 直接返回dt
’timestamp’, 返回timestamp
’int’, 返回int格式日期,如’20221106’
None, 默认’%Y-%m-%d %H:%M:%S’格式
指定其他文本格式
tz (str) – 指定时区,可选[‘local’, ‘utc’]
Examples
>>> dt1, dt2 = datetime.datetime.now(), time.localtime() >>> dt3 = pd_str2datetime(datetime_now()) >>> datetime2str(dt1, 'timestamp', 'utc') >>> datetime2str(dt2, 'timestamp', 'utc') >>> datetime2str(dt3, 'timestamp', 'utc') >>> dt4 = time.gmtime() >>> datetime2str(dt3, 'timestamp', 'utc') >>> datetime2str(dt3, 'timestamp')
dtseries2str
- dramkit.datetimetools.dtseries2str(series, joiner='-', strformat=None)
pd.Series转化为str,若不指定strformat,则按joiner连接年月日
get_datetime_strformat
- dramkit.datetimetools.get_datetime_strformat(tstr, ymd=['-', '', '.', '/'], ymd_hms=[' ', '', '.'], hms=[':', ''], hms_ms=['.', ''])
- 获取常用的文本时间格式format,支持包含以下几个部分的时间类型:年月日、年月日时分秒、年月日时分、年月日时分秒毫秒、年月日时、年月
Todo
改为用正则表达式匹配(根据连接符、数字占位限制和数字大小限制来匹配)
- Parameters:
ymd (list) – 年月日连接符
ymd_hms (list) – 年月日-时分秒连接符
hms (list) – 时分秒连接符
hms_ms (list) – 时分秒-毫秒连接符
str2datetime
- dramkit.datetimetools.str2datetime(tstr, strformat=None)
时间字符串转datetime格式
str2timestamp
- dramkit.datetimetools.str2timestamp(t, strformat=None, tz='local')
- 时间字符串转时间戳若大于16位(纳秒级),返回string,否则返回float(毫秒级)
timestamp2str
- dramkit.datetimetools.timestamp2str(t, strformat=None, tz='local', method=2)
- 时间戳转化为字符串格式注意:t小于0时直接按t为秒数处理(即使整数部分超过10位也不会处理为小数)
get_datetime_format
- dramkit.datetimetools.get_datetime_format(dt, ymd=['-', '', '.', '/'], ymd_hms=[' ', '', '.'], hms=[':', ''], hms_ms=['.', ''])
- 获取常用日期时间格式format注:整数和浮点数都可能是时间戳,这里以常用的规则来判断:- 若整数长度为8位,判断为整数型日期- 若整数或浮点数长度不小于10位不大于19位,判断为时间戳- 其他情况下的整数或浮点数报错处理
copy_format0
- dramkit.datetimetools.copy_format0(to_tm, from_tm)
- 复制日期时间格式若from_tm是日期时间格式或时间戳格式,则直接返回to_tm
x2datetime
- dramkit.datetimetools.x2datetime(x, tz='local')
- x转化为datetime格式,若x为timestamp,应设置时区tz若x为8位整数,则转化为str处理,其余情况直接用用pd.to_datetime处理
copy_format
- dramkit.datetimetools.copy_format(to_tm, from_tm)
复制日期时间格式
time_add
- dramkit.datetimetools.time_add(t, seconds=0, minutes=0, hours=0, days=0)
时间运算,return_format指定返回格式,若为False,返回datetime格式
date_add_nday
- dramkit.datetimetools.date_add_nday(date, n=1)
在给定日期date上加上n天(减去时n写成负数即可)
is_datetime
- dramkit.datetimetools.is_datetime(x)
判断x是否为时间日期
is_month_end
- dramkit.datetimetools.is_month_end(date=None, tz='local')
判断date是否为月末,若date为timestamp,应设置时区tz
is_month_start
- dramkit.datetimetools.is_month_start(date=None, tz='local')
判断date是否为月初,若date为timestamp,应设置时区tz
is_quarter_end
- dramkit.datetimetools.is_quarter_end(date=None, tz='local')
判断date是否为季末,若date为timestamp,应设置时区tz
is_quarter_start
- dramkit.datetimetools.is_quarter_start(date=None, tz='local')
判断date是否为季末,若date为timestamp,应设置时区tz
datetime_now
- dramkit.datetimetools.datetime_now(strformat=None)
获取当前时间
today_month
- dramkit.datetimetools.today_month(joiner='-')
获取今日所在年月,格式由连接符joiner确定
today_date
- dramkit.datetimetools.today_date(joiner='-')
获取今日日期,格式由连接符joiner确定
get_quarter
- dramkit.datetimetools.get_quarter(dt=None, joiner='Q')
获取日期所在季度
get_quarter_start_end_by_yq
- dramkit.datetimetools.get_quarter_start_end_by_yq(y, q, joiner='-')
get_quarter_start_end_by_date
- dramkit.datetimetools.get_quarter_start_end_by_date(date=None)
获取date日期所在季度的起始日期
today_quarter
- dramkit.datetimetools.today_quarter(joiner='Q')
获取今日所在季度
get_2part_next
- dramkit.datetimetools.get_2part_next(part1, part2, step, n=1)
Example
>>> get_2part_next(2023, 2, 4, 2) # 2023Q2往后推2个季度 (2023, 4) >>> get_2part_next(2023, 4, 4, 5) # 2023Q4往后推5个季度 (2025, 1) >>> get_2part_next(2023, 4, 12, 8) # 202304往后推8个月 (2023, 12) >>> get_2part_next(2023, 9, 12, 10) # 202309往后推10个月 (2024, 7) >>> get_2part_next(2023, 2, 4, -2) # 2023Q2往前推2个季度 (2022, 4) >>> get_2part_next(2023, 4, 4, -5) # 2023Q4往前推5个季度 (2022, 3) >>> get_2part_next(2023, 4, 12, -8) # 202304往前推8个月 (2022, 4) >>> get_2part_next(2023, 9, 12, -10) # 202309往前推10个月 (2022, 11)
get_pre_quarter
- dramkit.datetimetools.get_pre_quarter(date=None, n=1, joiner='Q')
- 获取date前第n个季度如:get_pre_quarter(“20230418”) -> “2023Q1”get_pre_quarter(“20230418”, 2) -> “2022Q4”get_pre_quarter(“20230418”, 3) -> “2022Q3”get_pre_quarter(“20230418”, 4) -> “2022Q2”get_pre_quarter(“20230418”, 5) -> “2022Q1”get_pre_quarter(“20230418”, 6) -> “2021Q4”get_pre_quarter(“20230418”, 7) -> “2021Q3”get_pre_quarter(“20230418”, 8) -> “2021Q2”
get_dayofweek
- dramkit.datetimetools.get_dayofweek(date=None)
返回date属于星期几(1-7)
get_dayofyear
- dramkit.datetimetools.get_dayofyear(date=None)
返回date属于一年当中的第几天(从1开始记)
get_month_end
- dramkit.datetimetools.get_month_end(date=None)
获取date所在月的月末日期
get_next_nmonth_start_end
- dramkit.datetimetools.get_next_nmonth_start_end(date=None, n=1)
get_next_nquarter_start_end
- dramkit.datetimetools.get_next_nquarter_start_end(date=None, n=1)
get_date_format
- dramkit.datetimetools.get_date_format(date, joiners=[' ', '-', '/', '*', '#', '@', '.', '_'])
判断日期格式位,返回日期位数和连接符
- Parameters:
date (str) – 表示日期的字符串
joiners (list) –
支持的格式连接符,不支持的连接符可能报错或返回’未知日期格式’
Note
注:若date无格式连接符,则只判断8位的格式,如’20200220’
- Returns:
tuple - (日期位数(可能为date字符串长度或’未知日期格式’), 格式连接符(或None))
date_reformat_simple
- dramkit.datetimetools.date_reformat_simple(date, joiner_ori, joiner_new='-')
时间格式简单转化
date_reformat_chn
- dramkit.datetimetools.date_reformat_chn(date, joiner='-')
指定连接符为joiner,重新格式化date,date格式为x年x月x日
gen_strformat
- dramkit.datetimetools.gen_strformat(joiner='')
date_reformat
- dramkit.datetimetools.date_reformat(date, joiner='-')
指定连接符为joiner,重新格式化date
date8_to_date10
- dramkit.datetimetools.date8_to_date10(date, joiner='-')
8位日期转换为10位日期,连接符为joiner
date10_to_date8
- dramkit.datetimetools.date10_to_date8(date)
10位日期转换为8位日期
diff_time_second
- dramkit.datetimetools.diff_time_second(time1, time2)
- 计算两个时间的差:time1-time2,time1和time2的格式应相同返回两个时间差的秒数
diff_days_date
- dramkit.datetimetools.diff_days_date(date1, date2)
计算两个日期间相隔天数,若date1大于date2,则输出为正,否则为负
get_dates_between
- dramkit.datetimetools.get_dates_between(date1, date2, keep1=False, keep2=True, only_workday=False, del_weekend=False, joiner=2)
获取date1到date2之间的日期列表,keep1和keep2设置结果是否保留date1和date2
Note
是否为workday用了chncal包,若chncal库没更新,可能导致结果不准确
get_work_dates_chncal
- dramkit.datetimetools.get_work_dates_chncal(start_date, end_date=None)
利用chncal获取指定范围内的工作日列表
get_recent_workday_chncal
- dramkit.datetimetools.get_recent_workday_chncal(date=None, dirt='post')
若date为工作日,则返回,否则返回下一个(post)或上一个(pre)工作日
Note
若chncal库没更新,可能导致结果不准确
get_next_nth_workday_chncal
- dramkit.datetimetools.get_next_nth_workday_chncal(date=None, n=1)
- 给定日期date,返回其后第n个工作日日期,n可为负数(返回结果在date之前)若n为0,直接返回date
Note
若chncal库没更新,可能导致结果不准确
get_date_weekday
- dramkit.datetimetools.get_date_weekday(weekday, n_week=0, joiner='')
获取指定星期的日期
- Parameters:
weekday (int) – 指定星期,如5表示星期5,7表示星期天
n_week (int) – 表示距离现在第几个星期,0表示当前星期,大于0表示往后推,小于0表示往前推
Examples
>>> get_date_weekday(3) # 获取本周三的日期 >>> get_date_weekday(6, -1) # 获取上周六的日期 >>> get_date_weekday(7, -1) # 获取下周日的日期
get_recent_inweekday
- dramkit.datetimetools.get_recent_inweekday(date=None, dirt='post')
若date不是周末,则返回date,否则返回下一个(post)或上一个(pre)周内日期
get_next_nth_inweekday
- dramkit.datetimetools.get_next_nth_inweekday(date=None, n=1)
- 给定日期date,返回其后第n个周内日日期,n可为负数(返回结果在date之前)若n为0,直接返回date
cut_date
- dramkit.datetimetools.cut_date(start_date, end_date, n=500)
以间隔为n天划分start_date和end_date之间日期子集
Examples
>>> start_date, end_date, n = '20200201', '20200225', 10 >>> cut_date(start_date, end_date, n) [['20200201', '20200210'], ['20200211', '20200220'], ['20200221', '20200225']]
diff_workdays_date
- dramkit.datetimetools.diff_workdays_date(date1, date2)
计算两个日期间相隔的工作日天数,若date1大于date2,则输出为正,否则为负
gentools
General toolboxs
TimeRecoder
DirtModifyError
- class dramkit.gentools.DirtModifyError
Bases:
Exception
GenObject
- class dramkit.gentools.GenObject(dirt_modify=True, **kwargs)
Bases:
object
创建一个数据类型,用于存放变量值,既可以用.key调用,也可以像字典一样调用[key]调用- __init__(dirt_modify=True, **kwargs)
初始化
- clear()
清空所有属性变量
- copy()
- property dirt_modify
- property items
- property keys
显示所有key
- listitems()
- listkeys()
- merge(o)
从另一个对象中合并属性和值
- pop(key)
删除属性变量key,有返回
- remove(key)
删除属性变量key,无返回
- set_dirt_modify(dirt_modify)
- set_from_dict(d)
通过dict批量增加属性变量
- set_key_value(key, value)
- update(o)
像dict一样通过update函数传入字典进行更新
DeprecatedError
- class dramkit.gentools.DeprecatedError
Bases:
Exception
StructureObject
- class dramkit.gentools.StructureObject(*args, **kwargs)
Bases:
object
run_func_with_timeout_thread
- dramkit.gentools.run_func_with_timeout_thread(func, *args, timeout=10, logger_error=False, logger_timeout=None, timeout_show_str=None, kill_when_timeout=True, **kwargs)
- 限定时间(timeout秒)执行函数func,若限定时间内未执行完毕,返回Noneargs为tuple或list,为func函数接受的参数列表注:在线程中强制结束函数可能导致未知错误,比如文件资源打开了但是强制结束时不能关闭
with_timeout_thread
- dramkit.gentools.with_timeout_thread(timeout=30, logger_error=None, logger_timeout=None, timeout_show_str=None, kill_when_timeout=True)
- 作为装饰器在指定时间timeout(秒)内运行函数,超时则结束运行通过控制线程实现
Examples
1import os 2import pandas as pd 3from dramkit.gentools import tmprint 4 5df1 = pd.DataFrame([[1, 2], [3, 4]]) 6df2 = pd.DataFrame([[5, 6], [7, 8]]) 7df1.to_excel('./_test/with_timeout_thread_test_df.xlsx') 8TIMEOUT = 3 9 10@with_timeout_thread(TIMEOUT, 11 logger_error=False 12 ) 13def func(x): 14 with open('./_test/with_timeout_thread_test_df.xlsx') as f: 15 tmprint('sleeping...') 16 time.sleep(5) 17 df2.to_excel('./_test/with_timeout_thread_test_df.xlsx') 18 return x 19 20def test(): 21 res = func('test') 22 print('res:', res) 23 os.remove('./_test/with_timeout_thread_test_df.xlsx') 24 return res
>>> res = test()
try_repeat_run
- dramkit.gentools.try_repeat_run(n_max_run=3, logger=None, sleep_seconds=0, force_rep=False)
- 作为装饰器尝试多次运行指定函数使用场景:第一次运行可能出错,需要再次运行(比如取数据时第一次可能连接超时,需要再取一次)
- Parameters:
n_max_run (int) – 最多尝试运行次数
logger (None, logging.Logger) – 日志记录器
sleep_seconds (int, float) –
多次执行时,上一次执行完成之后需要暂停的时间(秒)注:在force_rep为True时,每次执行完都会暂停,force_rep为False只有报错之后才会暂停force_rep (bool) – 若为True,则不论是否报错都强制重复执行,若为False,则只有报错才会重复执行
- Returns:
result – 若目标函数执行成功,则返回执行结果;若失败,则返回None
- Return type:
None, other
Examples
1from dramkit import simple_logger 2logger = simple_logger() 3 4@try_repeat_run(2, logger=logger, sleep_seconds=0, force_rep=False) 5def rand_div(x): 6 return x / np.random.randint(-1, 1) 7 8def repeat_test(info_): 9 print(info_) 10 return rand_div(0)
>>> a = repeat_test('repeat test...') >>> print(a)
capture_print
- dramkit.gentools.capture_print(logger=None, del_last_blank=True)
作为装饰器捕获函数中的print内容
- Parameters:
logger (None, logging.Logger) – 日志记录器,若为None,则会优先使用被调用函数参数中的logger
Examples
1from dramkit import simple_logger 2logger = simple_logger() 3 4@capture_print(logger) 5def test_print(): 6 print('test_print...') 7 8@capture_print() 9def test_print1(logger=None): 10 print('test_print1...') 11 12@capture_print(simple_logger(logname='test')) 13def test_print2(logger=None): 14 print('test_print2...') 15 16@capture_print(False) 17def test_print3(logger=None): 18 print('test_print3...')
>>> test_print() test_print... [INFO: 2023-04-29 10:45:41,671] >>> test_print1(logger=simple_logger('./_test/capture_print_test.log')) >>> test_print2(logger=simple_logger('./_test/capture_print_test.log')) >>> test_print3()
log_used_time
- dramkit.gentools.log_used_time(logger=None)
作为装饰器记录函数运行用时
- Parameters:
logger (None, logging.Logger) – 日志记录器,若为None,则会优先使用被调用函数参数中的logger
Examples
1from dramkit import simple_logger 2logger = simple_logger() 3 4@log_used_time(logger) 5def wait(): 6 print('wait...') 7 time.sleep(3) 8 9@log_used_time() 10def wait1(logger=None): 11 print('wait...') 12 time.sleep(3) 13 14@log_used_time(simple_logger(logname='test')) 15def wait2(logger=None): 16 print('wait...') 17 time.sleep(3) 18 19@log_used_time(False) 20def wait3(logger=None): 21 print('wait...') 22 time.sleep(3)
>>> wait() wait... func `wait` run time: 3.0s. [INFO: 2023-04-20 13:39:37,292] >>> wait1(logger=simple_logger('./_test/log_used_time_test.log')) >>> wait2(logger=simple_logger('./_test/log_used_time_test.log')) >>> wait3()
See also
References
print_used_time
- dramkit.gentools.print_used_time(func)
作为装饰器打印函数运行用时
- Parameters:
func (function) – 需要记录运行用时的函数
Examples
1from dramkit import simple_logger 2 3@print_used_time 4def wait(x, **kwargs): 5 print('wait...') 6 time.sleep(x)
>>> wait(3) wait... func `wait` run time: 3.0s. >>> wait(x=3, b=5, logger=simple_logger()) wait... func `wait` run time: 3.0s. [INFO: 2023-04-20 13:22:32,643]
See also
References
func_res_process_decorator
- dramkit.gentools.func_res_process_decorator(func_res, *args_res, **kwargs_res)
作为装饰器处理函数输出结果
Examples
>>> def res_process(res, k, b): ... return res * k + b ... ... @func_res_process_decorator(res_process, 2, 5) ... def a(x, y): ... return x+y >>> a(2, 3) 15
func_runtime_test
- dramkit.gentools.func_runtime_test(func, n=10000, return_all=False, *args, **kwargs)
函数性能(运行时间)测试,n设置测试运行测试
check_list_arg
- dramkit.gentools.check_list_arg(arg, allow_none=False)
检查arg,若其不是list或tuple,则转为列表
get_update_kwargs
- dramkit.gentools.get_update_kwargs(key, arg, kwargs, arg_default=None, func_update=None)
取出并更新kwargs中key参数的值
使用场景:当一个函数接受**kwargs参数,同时需要对kwargs里面的某个key的值进行更新并且 提取出来单独传参
- Parameters:
key – kwargs中待取出并更新的关键字
arg – 关键字key对应的新值
kwargs (dict) – 关键字参数对
arg_default (key关键词对应参数默认值) –
func_update (None, False, function) –
自定义取出的key参数值更新函数: arg_new = func_update(arg, arg_old)
若为False, 则不更新,直接取出原来key对应的参数值或默认值
若为`replace`, 则直接替换更新
若为None, 则 默认 更新方式为:
若参数值arg为dict或list类型,则增量更新
若参数值arg_old为list且arg不为list,则增量更新
其它情况直接替换更新
- Returns:
arg_new – 取出并更新之后的关键字key对应参数值
kwargs – 删除key之后的kwargs
Examples
>>> key, arg = 'a', 'aa' >>> kwargs = {'a': 'a', 'b': 'b'} >>> get_update_kwargs(key, arg, kwargs) ('aa', {'b': 'b'}) >>> key, arg = 'a', {'a_': 'aa__'} >>> kwargs = {'a': {'a': 'aa'}, 'b': 'b'} >>> get_update_kwargs(key, arg, kwargs) ({'a': 'aa', 'a_': 'aa__'}, {'b': 'b'}) >>> key, arg = 'a', ['aa', 'aa_'] >>> kwargs = {'a': ['a'], 'b': 'b'} >>> get_update_kwargs(key, arg, kwargs) (['a', 'aa', 'aa_'], {'b': 'b'}) >>> key, arg = 'a', ['aa', 'aa_'] >>> kwargs = {'a': ['a'], 'b': 'b'} >>> get_update_kwargs(key, arg, kwargs, func_update='replace') (['aa', 'aa_'], {'b': 'b'}) >>> key, arg = 'a', 'aa_' >>> kwargs = {'a': ['a'], 'b': 'b'} >>> get_update_kwargs(key, arg, kwargs) (['a', 'aa_'], {'b': 'b'})
roulette_base
- dramkit.gentools.roulette_base(fitness)
基本轮盘赌法
- Parameters:
fitness (list) –
所有备选对象的fitness值列表
Note
fitness的元素值应为正,且fitness值越大,被选中概率越大
- Returns:
int - 返回被选中对象的索引号
References
roulette_stochastic_accept
- dramkit.gentools.roulette_stochastic_accept(fitness)
轮盘赌法,随机接受法
- Parameters:
fitness (list) –
所有备选对象的fitness值列表
Note
fitness的元素值应为正,且fitness值越大,被选中概率越大
- Returns:
int - 返回被选中对象的索引号
References
roulette_count
- dramkit.gentools.roulette_count(fitness, n=10000, rand_func=None)
轮盘赌法n次模拟,返回每个备选对象在n次模拟中被选中的次数
- Parameters:
fitness (list, dict) –
所有备选对象的fitness值列表或字典,格式参见Example
Note
fitness的元素值应为正,且fitness值越大,被选中概率越大
n (int) – 模拟次数
rand_func (None, function) –
指定轮盘赌法函数,如’roulette_base’(dramkit.gentools.roulette_base()
)或’roulette_stochastic_accept’(dramkit.gentools.roulette_stochastic_accept()
),默认用’roulette_stochastic_accept’
- Returns:
list, dict - 返回每个对象在模拟n次中被选中的次数
Examples
>>> fitness = [1, 2, 3] >>> roulette_count(fitness, n=6000) [(0, 991), (1, 2022), (2, 2987)] >>> fitness = (1, 2, 3) >>> roulette_count(fitness, n=6000) [(0, 1003), (1, 1991), (2, 3006)] >>> fitness = [('a', 1), ('b', 2), ('c', 3)] >>> roulette_count(fitness, n=6000) [('a', 997), ('b', 1989), ('c', 3014)] >>> fitness = [['a', 1], ['b', 2], ['c', 3]] >>> roulette_count(fitness, n=6000) [('a', 1033), ('b', 1967), ('c', 3000)] >>> fitness = {'a': 1, 'b': 2, 'c': 3} >>> roulette_count(fitness, n=6000) {'a': 988, 'b': 1971, 'c': 3041}
rand_sum
- dramkit.gentools.rand_sum(target_sum, n, lowests, highests, isint=True, n_dot=6)
在指定最大最小值范围内随机选取若干个随机数,所选取数之和为定值
- Parameters:
target_sum (int, float) – 目标和
n (int) – 随机选取个数
lowests (int, floot, list) – 随机数下限值,若为list,则其第k个元素对应第k个随机数的下限
highests (int, floot, list) – 随机数上限值,若为list,则其第k关元素对应第k个随机数的上限
isint (bool) –
所选数是否强制为整数,若为False,则为实数
Note
若输入lowests或highests不是int,则isint为True无效
n_dot (int) – 动态上下界值与上下限比较时控制小数位数(为了避免python精度问题导致的报错)
- Returns:
list - 随机选取的n个数,其和为target_sum
Examples
>>> rand_sum(100, 2, [20, 30], 100) [65, 35] >>> rand_sum(100, 2, 20, 100) [41, 59] >>> rand_sum(100, 2, [20, 10], [100, 90]) [73, 27]
rand_weight_sum
- dramkit.gentools.rand_weight_sum(weight_sum, n, lowests, highests, weights=None, n_dot=6)
在指定最大最小值范围内随机选取若干个随机数,所选取数之加权和为定值
- Parameters:
weight_sum (float) – 目标加权和
n (int) – 随机选取个数
lowests (int, floot, list) – 随机数下限值,若为list,则其第k个元素对应第k个随机数的下限
highests (int, floot, list) – 随机数上限值,若为list,则其第k关元素对应第k个随机数的上限
weights (None, list) –
权重列表
Note
lowests和highests与weights应一一对应
n_dot (int) – 动态上下界值与上下限比较时控制小数位数(为了避免python精度问题导致的报错)
- Returns:
list - 随机选取的n个数,其以weights为权重的加权和为weight_sum
Examples
>>> rand_weight_sum(60, 2, [20, 30], 100) [21.41082008017613, 98.58917991982386] >>> rand_weight_sum(70, 2, 20, 100) [56.867261610484356, 83.13273838951565] >>> rand_weight_sum(80, 2, [20, 10], [100, 90]) [80.32071140116187, 79.67928859883813] >>> rand_weight_sum(80, 2, [20, 10], [100, 90], [0.6, 0.4]) [88.70409567475888, 66.94385648786168] >>> rand_weight_sum(180, 2, [20, 10], [100, 90], [3, 2]) [23.080418085462018, 55.37937287180697]
replace_repeat_iter
- dramkit.gentools.replace_repeat_iter(series, val, val0, gap=None, keep_last=False)
替换序列中重复出现的值
series (pd.Series) 中若步长为gap的范围内出现多个val值,则只保留第一条记录, 后面的替换为val0若gap为None,则将连续出现的val值只保留第一个,其余替换为val0(这里连续出现val是指 不出现除了val和val0之外的其他值)若keep_last为True,则连续的保留最后一个返回结果为替换之后的series (pd.Series)
Examples
>>> data = pd.DataFrame([0, 1, 1, 0, -1, -1, 2, -1, 1, 0, 1, 1, 1, 0, 0, ... -1, -1, 0, 0, 1], columns=['test']) >>> data['test_rep'] = replace_repeat_iter(data['test'], 1, 0, gap=None) >>> data test test_rep 0 0 0 1 1 1 2 1 0 3 0 0 4 -1 -1 5 -1 -1 6 2 2 7 -1 -1 8 1 1 9 0 0 10 1 0 11 1 0 12 1 0 13 0 0 14 0 0 15 -1 -1 16 -1 -1 17 0 0 18 0 0 19 1 1 >>> series = pd.Series([-1, 1, -1, 0, 1, 0, 1, 1, -1]) >>> replace_repeat_iter(series, 1, 0, gap=5) 0 -1 1 1 2 -1 3 0 4 1 5 0 6 0 7 0 8 -1
replace_repeat_pd
- dramkit.gentools.replace_repeat_pd(series, val, val0, keep_last=False)
- 替换序列中重复出现的值, 仅保留第一个函数功能,参数和意义同
dramkit.gentools.replace_repeat_iter()
区别在于计算时在pandas.DataFrame里面进行而不是采用迭代方式,同时取消了gap 参数(即连续出现的val值只保留第一个)
replace_repeat_func_iter
- dramkit.gentools.replace_repeat_func_iter(series, func_val, func_val0, gap=None, keep_last=False)
- 替换序列中重复出现的值,功能与
dramkit.gentools.replace_repeat_iter()
类似,只不过把val和val0的值由直接指定换成了由指定函数生成func_val
函数用于判断连续条件,其返回值只能是True或False,func_val0
函数用于生成替换的新值。即series中若步长为gap的范围内出现多个满足func_val函数为True的值, 则只保留第一条记录,后面的替换为函数func_val0的值。若gap为None,则将连续出现的满足func_val函数为True的值只保留第一个,其余替换为函数 func_val0的值(这里连续出现是指不出现除了满足func_val为True和等于func_val0函数值 之外的其他值)返回结果为替换之后的series (pd.Series)
Examples
>>> data = pd.DataFrame({'y': [0, 1, 1, 0, -1, -1, 2, -1, 1, 0, 1, ... 1, 1, 0, 0, -1, -1, 0, 0, 1]}) >>> data['y_rep'] = replace_repeat_func_iter( ... data['y'], lambda x: x < 1, lambda x: 3, gap=None) >>> data y y_rep 0 0 0 1 1 1 2 1 1 3 0 0 4 -1 3 5 -1 3 6 2 2 7 -1 -1 8 1 1 9 0 0 10 1 1 11 1 1 12 1 1 13 0 0 14 0 3 15 -1 3 16 -1 3 17 0 3 18 0 3 19 1 1
replace_repeat_func_pd
- dramkit.gentools.replace_repeat_func_pd(series, func_val, func_val0, keep_last=False)
替换序列中重复出现的值, 仅保留第一个
函数功能,参数和意义同dramkit.gentools.replace_repeat_func_iter()
区别在于计算时在pandas.DataFrame里面进行而不是采用迭代方式同时取消了gap参数(即连续出现的满足func_val为True的值只保留第一个)
con_count
- dramkit.gentools.con_count(series, func_cond, via_pd=True)
计算series(pd.Series)中连续满足func_cond函数指定的条件的记录数
- Parameters:
series (pd.Series) – 目标序列
func_cond (function) – 指定条件的函数,func_cond(x)返回结果只能为True或False
via_pd (bool) – 若via_pd为False,则计算时使用循环迭代,否则在pandas.DataFrame里面进行计算
- Returns:
pd.Series - 返回连续计数结果
Examples
>>> df = pd.DataFrame([0, 0, 1, 1, 0, 0, 1, 1, 1], columns=['series']) >>> func_cond = lambda x: True if x == 1 else False >>> df['count1'] = con_count(df['series'], func_cond, True) >>> df series count1 0 0 0 1 0 0 2 1 1 3 1 2 4 0 0 5 0 0 6 1 1 7 1 2 8 1 3 >>> df['count0'] = con_count(df['series'], lambda x: x != 1, False) >>> df series count1 count0 0 0 0 1 1 0 0 2 2 1 1 0 3 1 2 0 4 0 0 1 5 0 0 2 6 1 1 0 7 1 2 0 8 1 3 0
con_count_ignore
- dramkit.gentools.con_count_ignore(series, func_cond, via_pd=True, func_ignore=None)
在
dramkit.gentools.con_count()
的基础上增加连续性判断条件:当series中的值满足func_ignore函数值为True时,不影响连续性判断(func_ignore 默认为
lambda x: isnull(x)
)
get_preval_func_cond
- dramkit.gentools.get_preval_func_cond(data, col_val, col_cond, func_cond)
- 获取上一个满足指定条件的行中col_val列的值,条件为:该行中col_cond列的值x满足func_cond(x)为True (func_cond(x)返回结果只能为True或False)返回结果为 pd.Series
Examples
>>> data = pd.DataFrame({'x1': [0, 1, 1, 0, -1, -1, 2, -1, 1, 0, 1, 1, 1, ... 0, 0, -1, -1, 0, 0, 1], ... 'x2': [0, 1, 1, 0, -1, -1, 1, -1, 1, 0, 1, 1, 1, ... 0, 0, -1, -1, 0, 0, 1]}) >>> data['x1_pre'] = get_preval_func_cond(data, 'x1', 'x2', lambda x: x != 1) >>> data x1 x2 x1_pre 0 0 0 NaN 1 1 1 0.0 2 1 1 0.0 3 0 0 0.0 4 -1 -1 0.0 5 -1 -1 -1.0 6 2 1 -1.0 7 -1 -1 -1.0 8 1 1 -1.0 9 0 0 -1.0 10 1 1 0.0 11 1 1 0.0 12 1 1 0.0 13 0 0 0.0 14 0 0 0.0 15 -1 -1 0.0 16 -1 -1 -1.0 17 0 0 -1.0 18 0 0 0.0 19 1 1 0.0
gap_count
- dramkit.gentools.gap_count(series, func_cond, via_pd=True)
计算series (pd.Series)中当前行距离上一个满足
func_cond
函数指定条件记录的行数func_cond为指定条件的函数,func_cond(x)返回结果只能为True或False, 若via_pd为False,则使用循环迭代,若via_pd为True,则在pandas.DataFrme内计算 返回结果为 pd.Series
Examples
>>> df = pd.DataFrame([0, 1, 1, 0, 0, 1, 1, 1], columns=['series']) >>> func_cond = lambda x: True if x == 1 else False >>> df['gap1'] = gap_count(df['series'], func_cond, True) >>> df series gap1 0 0 0 1 1 0 2 1 1 3 0 1 4 0 2 5 1 3 6 1 1 7 1 1 >>> df['gap0'] = gap_count(df['series'], lambda x: x != 1, False) >>> df series gap1 gap0 0 0 0 0 1 1 0 1 2 1 1 2 3 0 1 3 4 0 2 1 5 1 3 1 6 1 1 2 7 1 1 3
count_between_gap
- dramkit.gentools.count_between_gap(data, col_gap, col_count, func_gap, func_count, count_now_gap=False, count_now=True, via_pd=True)
计算data (pandas.DataFrame)中当前行与上一个满足
func_gap
函数为True的行之间, 满足func_count
函数为True的记录数函数func_gap作用于col_gap
列,func_count作用于col_count
列, 两者返回值均为True或Falsecount_now_gap
设置满足func_gap的行是否参与计数,若为False, 则该行计数为0,若为True,则该行按照上一次计数的最后一次计数处理Todo
增加count_now_gap的处理方式:
该行计数为0
该行按上一次计数的最后一次计数处理
该行按下一次计数的第一次计数处理
count_now
设置当当前行满足func_count时,从当前行开始对其计数还是从下一行开始对其计数Note
注:当前行若满足同时满足func_gap和func_count,对其计数的行不会为下一行 (即要么不计数,要么在当前行对其计数)
若via_pd为True,则调用
count_between_gap_pd()
实现,否则用count_between_gap_iter()
返回结果为 pd.Series
Examples
>>> data = pd.DataFrame({'to_gap': [0, 1, 1, 0, -1, -1, 2, -1, 1, 0, -1, 1, ... 1, 0, 0, -1, -1, 0, 0, 1], ... 'to_count': [0, 1, 1, 0, -1, -1, 1, -1, 1, 0, 1, ... 1, 1, 0, 0, -1, -1, 0, 0, 1]}) >>> data['gap_count'] = count_between_gap(data, 'to_gap', 'to_count', ... lambda x: x == -1, lambda x: x == 1, ... count_now_gap=False, count_now=False) >>> data to_gap to_count gap_count 0 0 0 0 1 1 1 0 2 1 1 0 3 0 0 0 4 -1 -1 0 5 -1 -1 0 6 2 1 0 7 -1 -1 0 8 1 1 0 9 0 0 1 10 -1 1 0 11 1 1 0 12 1 1 1 13 0 0 2 14 0 0 2 15 -1 -1 0 16 -1 -1 0 17 0 0 0 18 0 0 0 19 1 1 0 >>> data = pd.DataFrame({'to_gap': [0, 1, 1, 0, -1, -1, 2, -1, 1, 0, -1, 1, 1, 0, 0, -1, -1, 0, 0, 1, -1, -1], 'to_count': [0, 1, 1, 0, -1, -1, 1, -1, 1, 0, 1, 1, 1, 0, 0, -1, 1, 0, 1, 1, 1, 1]}) >>> data['gap_count'] = count_between_gap(data, 'to_gap', 'to_count', lambda x: x == -1, lambda x: x == 1, count_now_gap=False, count_now=True) >>> data to_gap to_count gap_count 0 0 0 0 1 1 1 0 2 1 1 0 3 0 0 0 4 -1 -1 0 5 -1 -1 0 6 2 1 1 7 -1 -1 0 8 1 1 1 9 0 0 1 10 -1 1 0 11 1 1 1 12 1 1 2 13 0 0 2 14 0 0 2 15 -1 -1 0 16 -1 1 0 17 0 0 0 18 0 1 1 19 1 1 2 20 -1 1 0 21 -1 1 0 >>> data = pd.DataFrame({'to_gap': [0, 1, 1, 0, -1, -1, 2, -1, 1, 0, -1, 1, 1, 0, 0, -1, -1, 0, 0, 1, -1, -1], 'to_count': [0, -1, -1, 0, -1, -1, 1, -1, 1, 0, 1, 1, 1, 0, 0, -1, -1, 0, -1, 1, 1, 1]}) >>> data['gap_count'] = count_between_gap(data, 'to_gap', 'to_count', lambda x: x == -1, lambda x: x == 1, count_now_gap=True, count_now=False) >>> data to_gap to_count gap_count 0 0 0 0 1 1 -1 0 2 1 -1 0 3 0 0 0 4 -1 -1 0 5 -1 -1 0 6 2 1 0 7 -1 -1 1 8 1 1 0 9 0 0 1 10 -1 1 1 11 1 1 0 12 1 1 1 13 0 0 2 14 0 0 2 15 -1 -1 2 16 -1 -1 0 17 0 0 0 18 0 -1 0 19 1 1 0 20 -1 1 1 21 -1 1 0 >>> data = pd.DataFrame({'to_gap': [0, 1, 1, 0, -1, -1, 2, -1, 1, 0, -1, 1, 1, 0, 0, -1, -1, 0, 0, 1, -1, -1], 'to_count': [0, -1, -1, 0, -1, -1, 1, -1, 1, 0, 1, 1, 1, 0, 0, -1, -1, 0, -1, 1, 1, 1]}) >>> data['gap_count'] = count_between_gap(data, 'to_gap', 'to_count', lambda x: x == -1, lambda x: x == 1, count_now_gap=True, count_now=True) >>> data to_gap to_count gap_count 0 0 0 0 1 1 -1 0 2 1 -1 0 3 0 0 0 4 -1 -1 0 5 -1 -1 0 6 2 1 1 7 -1 -1 1 8 1 1 1 9 0 0 1 10 -1 1 2 11 1 1 1 12 1 1 2 13 0 0 2 14 0 0 2 15 -1 -1 2 16 -1 -1 0 17 0 0 0 18 0 -1 0 19 1 1 1 20 -1 1 2 21 -1 1 1
count_between_gap_pd
- dramkit.gentools.count_between_gap_pd(data, col_gap, col_count, func_gap, func_count, count_now_gap=True, count_now=True)
参数和功能说明见
dramkit.gentools.count_between_gap()
函数
count_between_gap_iter
- dramkit.gentools.count_between_gap_iter(data, col_gap, col_count, func_gap, func_count, count_now_gap=True, count_now=True)
参数和功能说明见
dramkit.gentools.count_between_gap()
函数
val_gap_cond
- dramkit.gentools.val_gap_cond(data, col_val, col_cond, func_cond, func_val, to_cal_col=None, func_to_cal=None, val_nan=nan, contain_1st=False)
计算data (pandas.DataFrame)中从上一个
col_cond
列满足func_cond
函数的行 到当前行,col_val
列记录的func_val
函数值func_cond作用于col_cond列,func_cond(x)返回True或False,x为单个值func_val函数作用于col_val列,func_val(x)返回单个值,x为np.array或pd.Series或列表等func_to_cal作用于to_cal_col列,只有当前行func_to_cal值为True时才进行func_val计算, 否则返回结果中当前行值设置为val_nancontain_1st设置func_val函数计算时是否将上一个满足func_cond的行也纳入计算Todo
参考
dramkit.gentools.count_between_gap()
的设置:设置col_cond列满足func_cond函数的行,其参与func_val函数的前一次计算还是下一次计算还是不参与计算
Examples
>>> data = pd.DataFrame({'val': [1, 2, 5, 3, 1, 7 ,9], ... 'sig': [1, 1, -1, 1, 1, -1, 1]}) >>> data['val_pre1'] = val_gap_cond(data, 'val', 'sig', ... lambda x: x == -1, lambda x: max(x)) >>> data val sig val_pre1 0 1 1 NaN 1 2 1 NaN 2 5 -1 NaN 3 3 1 3.0 4 1 1 3.0 5 7 -1 7.0 6 9 1 9.0
filter_by_func_prenext
- dramkit.gentools.filter_by_func_prenext(l, func_prenext)
对
l
(list)进行过滤,过滤后返回的lnew
(list)任意前后相邻两个元素满足:func_prenext(lnew[i], lnew[i+1]) = True
过滤过程为:将
l
的第一个元素作为起点,找到其后第一个满足func_prenext
函数 值为True的元素,再以该元素为起点往后寻找…Examples
>>> l = [1, 2, 3, 4, 1, 1, 2, 3, 6] >>> func_prenext = lambda x, y: (y-x) >= 2 >>> filter_by_func_prenext(l, func_prenext) [1, 3, 6] >>> l = [1, 2, 3, 4, 1, 5, 1, 2, 3, 6] >>> filter_by_func_prenext(l, func_prenext) [1, 3, 5] >>> filter_by_func_prenext(l, lambda x, y: y == x+1) [1, 2, 3, 4] >>> l = [(1, 2), (2, 3), (4, 1), (5, 0)] >>> func_prenext = lambda x, y: abs(y[-1]-x[-1]) == 1 >>> filter_by_func_prenext(l, func_prenext) [(1, 2), (2, 3)]
filter_by_func_prenext_series
- dramkit.gentools.filter_by_func_prenext_series(series, func_prenext, func_ignore=None, val_nan=nan)
对series (pandas.Series)调用
filter_by_func_prenext
函数进行过滤, 其中满足func_ignore
函数为True的值不参与过滤,func_ignore函数默认为:lambda x: isnull(x)
series中 被过滤的值 在返回结果中用
val_nan
替换, 不参与过滤 的值保持不变Examples
>>> series = pd.Series([1, 2, 3, 4, 1, 1, 2, 3, 6]) >>> func_prenext = lambda x, y: (y-x) >= 2 >>> filter_by_func_prenext_series(series, func_prenext) 0 1.0 1 NaN 2 3.0 3 NaN 4 NaN 5 NaN 6 NaN 7 NaN 8 6.0 >>> series = pd.Series([1, 2, 0, 3, 0, 4, 0, 1, 0, 0, 1, 2, 3, 6], ... index=range(14, 0, -1)) >>> filter_by_func_prenext_series(series, func_prenext, lambda x: x == 0) 14 1.0 13 NaN 12 0.0 11 3.0 10 0.0 9 NaN 8 0.0 7 NaN 6 0.0 5 0.0 4 NaN 3 NaN 2 NaN 1 6.0
df_na2value
- dramkit.gentools.df_na2value(df, value=None)
- df中nan值替换为value
copy_df_structure
- dramkit.gentools.copy_df_structure(df)
复制df的结构到一个空的dataframe
get_tmp_new
- dramkit.gentools.get_tmp_new(exists, tmp_new, ext='_')
以tmp_new为基础生成一个不在exists的值
get_tmp_col
- dramkit.gentools.get_tmp_col(df, tmp_col_name)
以tmp_col_name为基础生成一个不在df的列名中的列
merge_df
- dramkit.gentools.merge_df(df_left, df_right, same_keep='left', **kwargs)
在
pd.merge
上改进,相同列名时自动去除重复的- Parameters:
df_left (pandas.DataFrame) – 待merge左表
df_right (pandas.DataFrame) – 待merge右表
same_keep (str) – 可选’left’, ‘right’,设置相同列保留左边df还是右边df
**kwargs – pd.merge接受的其他参数
- Returns:
pandas.DataFrame - 返回merge之后的数据表
update_df
- dramkit.gentools.update_df(df_old, df_new, idcols=None, del_dup_cols=None, rep_keep='new', sort_cols=None, ascendings=True, method='merge', logger=None)
- 合并df_new到df_old注意:df_old和df_new不应该设置index注意:用merge处理当数据量大时占空间很大,method应设置为`concat`
Todo
增加对各列的类型进行指定的参数设置
Examples
>>> df_old = pd.DataFrame({'id1': [1, 2, 3, 4, 5], ... 'id2': [2, 3, 4, 5, 6], ... 'col1': ['a', 'b', 'c', 'd', 'e'], ... 'col2': [2, 4, 6, 8, 10]}) >>> df_new = pd.DataFrame({'id1': [3, 4, 5, 6, 7], ... 'id2': [4, 5, 6, 7, 8], ... 'col1': ['c', 'ddd', np.nan, 'f', 'g'], ... 'col3': [6, 8, 10, 12, 14]}) ... idcols = ['id1', 'id2'] ... rep_keep = 'new' ... del_dup_cols = None#['id1', 'id2'] >>> a = update_df(df_old, df_new, ... idcols=idcols, ... del_dup_cols=del_dup_cols, ... rep_keep=rep_keep, ... method='merge') >>> b = update_df(df_old, df_new, ... idcols=idcols, ... del_dup_cols=del_dup_cols, ... rep_keep=rep_keep, ... method='concat')
dfs_concat_axis1
- dramkit.gentools.dfs_concat_axis1(dfs_list, idcols=None, ascending=True)
多个df列表横向拼接
cut_df_by_con_val
- dramkit.gentools.cut_df_by_con_val(df, by_col, func_eq=None)
根据 by_col 列的值,将 df (pandas.DataFrame) 切分为多个子集列表,返回 list
切分依据:
func_eq
函数作用于by_col
列,函数值连续相等的记录被划分到一个子集中Examples
>>> df = pd.DataFrame({'val': range(0,10), ... 'by_col': ['a']*3+['b']*2+['c']*1+['a']*3+['d']*1}) >>> df.index = ['z', 'y', 'x', 'w', 'v', 'u', 't', 's', 'r', 'q'] >>> cut_df_by_con_val(df, 'by_col') [ val by_col z 0 a y 1 a x 2 a, val by_col w 3 b v 4 b, val by_col u 5 c, val by_col t 6 a s 7 a r 8 a, val by_col q 9 d]
get_con_start_end
- dramkit.gentools.get_con_start_end(series, func_con)
找出series (pandas.Series)中值连续满足
func_con
函数值为True的分段起止位置, 返回起止位置对列表Examples
>>> series = pd.Series([0, 1, 1, 0, 1, 1, 0, -1, -1, 0, 0, -1, 1, 1, 1, 1, 0, -1]) >>> start_ends = get_con_start_end(series, lambda x: x == -1) >>> start_ends [[7, 8], [11, 11], [17, 17]] >>> start_ends = get_con_start_end(series, lambda x: x == 1) >>> start_ends [[1, 2], [4, 5], [12, 15]]
cut_range_to_subs
- dramkit.gentools.cut_range_to_subs(n, gap)
将
range(0, n)
切分成连续相接的子集:[range(0, gap), range(gap, 2*gap), ...]
cut_to_subs
- dramkit.gentools.cut_to_subs(l, gap)
将列表分段,gap指定每段的个数
check_l_allin_l0
- dramkit.gentools.check_l_allin_l0(l, l0)
判断
l (list)
中的值是否都是l0 (list)
中的元素, 返回True或FalseExamples
>>> l = [1, 2, 3, -1, 0] >>> l0 = [0, 1, -1] >>> check_l_allin_l0(l, l0) False >>> l = [1, 1, 0, -1, -1, 0, 0] >>> l0 = [0, 1, -1] >>> check_l_in_l0(l, l0) True
check_exist_data
- dramkit.gentools.check_exist_data(df, x_list, cols=None)
依据指定的
cols
列检查df (pandas.DataFrame)
中是否已经存在x_list (list)
中的记录, 返回list,每个元素值为True或FalseExamples
>>> df = pd.DataFrame([['1', 2, 3.1, ], ['3', 4, 5.1], ['5', 6, 7.1]], ... columns=['a', 'b', 'c']) >>> x_list, cols = [[3, 4], ['3', 4]], ['a', 'b'] >>> check_exist_data(df, x_list, cols=cols) [False, True] >>> check_exist_data(df, [['1', 3.1], ['3', 5.1]], ['a', 'c']) [True, True]
isnull
- dramkit.gentools.isnull(x)
判断x是否为无效值(None, nan, x != x),若是无效值,返回True,否则返回False
x_div_y
- dramkit.gentools.x_div_y(x, y, v_x0=None, v_y0=0, v_xy0=1)
x除以y
v_xy0为当x和y同时为0时的返回值
v_y0为当y等于0时的返回值
v_x0为当x等于0时的返回值
power
- dramkit.gentools.power(a, b, return_real=True)
计算a的b次方,return_real设置是否只返回实属部分
log
- dramkit.gentools.log(x, bottom=None)
计算对数, bottom指定底
cal_pct
- dramkit.gentools.cal_pct(v0, v1, vv00=1, vv10=- 1)
计算从v0到v1的百分比变化
vv00为当v0的值为0且v1为正时的返回值,v1为负时取负号
vv10为当v1的值为0且v0为正时的返回值,v0为负时取负号
Todo
正负无穷大处理
pct_change
- dramkit.gentools.pct_change(series, lag=1)
计算series百分比变化
Todo
分子分母为0或正负无穷大处理
s1divs2
- dramkit.gentools.s1divs2(series1, series2, vs20=nan)
两个序列相比
min_com_multer
- dramkit.gentools.min_com_multer(l)
求一列数 l (list) 的最小公倍数,支持负数和浮点数
max_com_divisor
- dramkit.gentools.max_com_divisor(l)
求一列数 l (list) 的最大公约数,只支持正整数
Note
只支持正整数
mcd2_tad
- dramkit.gentools.mcd2_tad(a, b)
辗转相除法求a和b的最大公约数,a、b为正数
Note
a, b应为正数
a, b为小数时由于精度问题会不正确
max_com_divisor_tad
- dramkit.gentools.max_com_divisor_tad(l)
用辗转相除法求一列数 l (list) 的最大公约数, l 元素均为正数
Note
l元素均为正数
l元素为小数时由于精度问题会不正确
References
https://blog.csdn.net/weixin_45069761/article/details/107954905
df_rows2cols
- dramkit.gentools.df_rows2cols(df, col_name, col_val_name, fill_value=None)
把df中col_value_name列数据按col_name列分组,通过unstack拆分为多列数据,新列名为col_name列中的值
Todo
有nan时的检查和处理
Examples
>>> df0 = pd.DataFrame( ... [['Saliy', 'midterm', 'class1', 'A'], ... ['Saliy', 'midterm', 'class3', 'B'], ... ['Saliy', 'final', 'class1', 'C'], ... ['Saliy', 'final', 'class3', 'C'], ... ['Jeff', 'midterm', 'class2', 'D'], ... ['Jeff', 'midterm', 'class4', 'A'], ... ['Jeff', 'final', 'class2', 'E'], ... ['Jeff', 'final', 'class4', 'C'], ... ['Roger', 'midterm', 'class2', 'C'], ... ['Roger', 'midterm', 'class5', 'B'], ... ['Roger', 'final', 'class2', 'A'], ... ['Roger', 'final', 'class5', 'A'], ... ['Karen', 'midterm', 'class3', 'C'], ... ['Karen', 'midterm', 'class4', 'A'], ... ['Karen', 'final', 'class3', 'C'], ... ['Karen', 'final', 'class4', 'A'], ... ['Brain', 'midterm', 'class1', 'B'], ... ['Brain', 'midterm', 'class5', 'A'], ... ['Brain', 'final', 'class1', 'B'], ... ['Brain', 'final', 'class5', 'C']], ... columns=['name', 'test', 'class', 'grade']) >>> col_name, col_val_name = 'class', 'grade' >>> df1 = df_rows2cols(df0, col_name, col_val_name) >>> df2 = df_rows2cols(df0, col_name, col_val_name, fill_value='None')
df_cols2rows
- dramkit.gentools.df_cols2rows(df, cols, col_name, col_val_name, dropna=True)
把df中指定的cols列数据通过stack堆积为行数据,新列名为col_name
Todo
有nan时的检查和处理
Examples
>>> na = [np.nan] >>> df3 = pd.DataFrame({'name': ['Saliy', 'Saliy', 'Jeff', 'Jeff', ... 'Roger', 'Roger', 'Karen', 'Karen', ... 'Brain', 'Brain'], ... 'test': ['midterm', 'final'] * 5, ... 'class1': ['A', 'C'] + na*6 + ['B', 'B'], ... 'class2': na*2 + ['D', 'E', 'C', 'A'] + na*4, ... 'class3': ['B', 'C'] + na*4 + ['C', 'C'] + na*2, ... 'class4': na*2 + ['A', 'C'] + na*2 + ['A', 'A'] + na*2, ... 'class5': na*4 + ['B', 'A'] + na*2 + ['A', 'C']}) >>> cols = ['class%s'%x for x in range(1, 6)] >>> col_name = 'class' >>> col_val_name = 'grade' >>> df4 = df_cols2rows(df3, cols, col_name, col_val_name) >>> df5 = df_cols2rows(df3, cols, col_name, col_val_name, ... dropna=False)
get_full_df
- dramkit.gentools.get_full_df(df: DataFrame, full_col: str, fulls: Iterable, idcols: Optional[Union[str, list[str]]] = None, vcols: Optional[Union[str, list[str]]] = None, val_nan: Optional[Any] = None, fill: Optional[FillMethodType] = 'ffill', final_dropna: bool = True, only_fulls: bool = True, method: FullMethodType = 'pivot')
df中full_col列的值扩充到fulls
Example
>>> df = pd.DataFrame({'t': [2, 3, 5, 7, 9], ... 'a': ['x1', 'x1', 'x3', 'x4', 'x4'], ... 'b': ['y1', 'y2', 'y3', 'y4', 'y4'], ... 'c': ['z1', 'z1', 'z5', 'z7', 'z9']}) >>> get_full_df(df, 't', range(1, 11)) >>> get_full_df(df, 't', range(1, 11), idcols='c', method='product') >>> get_full_df(df, 't', range(1, 11), idcols=['a', 'b'], method='product') >>> get_full_df(df, 't', range(1, 11), idcols=['a', 'b']) >>> df['c'] = ['z1', 'z1', np.nan, np.nan, 'z9'] >>> get_full_df(df, 't', range(1, 11), idcols=['a', 'b'], val_nan='npnan') >>> get_full_df(df, 't', [2, 5, 9, 10], idcols=['a', 'b'], val_nan='npnan', final_dropna=False) >>> get_full_df(df, 't', [2, 5, 9, 10], idcols=['a', 'b'], val_nan='npnan', final_dropna=True, method='product')
get_full_components_df
- dramkit.gentools.get_full_components_df(df: DataFrame, parent_col: str, child_col: str, tincol: str, toutcol: str, tfulls: Iterable, toutnan: DateTimeType, tcol_res: str = 'time', keep_inout: bool = False)
- 根据纳入、退出日期获取在给定时间内的所有成分
Example
>>> df = pd.DataFrame( ... {'idxname': ['a', 'a', 'b', 'b', 'a', 'c', 'c'], ... 'stock': ['a1', 'a2', 'b1', 'b2', 'a3', 'c1', 'c2'], ... 'indate': ['20210101', '20210515', '20210405', '20210206', ... '20220307', '20220910', '20230409'], ... 'outdate': ['20220305', np.nan, np.nan, '20230209', ... np.nan, np.nan, '20230518']}) >>> parent_col, child_col, tincol, toutcol = 'idxname', 'stock', 'indate', 'outdate' >>> from finfactory.fintools.utils_chn import get_dates_conds >>> tfulls = get_dates_conds('quarter_end', '20210101') >>> tfulls = [x.strftime('%Y%m%d') for x in tfulls] >>> toutnan = '20991231' >>> df1 = get_full_components_df( ... df, parent_col, child_col, tincol, toutcol, ... tfulls, toutnan, tcol_res='time')
get_first_appear
- dramkit.gentools.get_first_appear(series, func_cond, reverse=False, return_iloc=False)
- 获取满足func_cond(x)为True的值在series中第一次出现时的索引若reverse为True,则是最后一次出现若return_iloc为True,则返回行号和值,否则返回索引和值
get_appear_order
- dramkit.gentools.get_appear_order(series, ascending=True)
标注series (pandas.Series , 离散值)中重复元素是第几次出现,
返回为 pandas.Series,ascending设置返回结果是否按出现次序升序排列
Examples
>>> df = pd.DataFrame({'v': ['A', 'B', 'A', 'A', 'C', 'C']}) >>> df.index = ['a', 'b', 'c', 'd', 'e', 'f'] >>> df['nth'] = get_appear_order(df['v'], ascending=False) >>> df v nth a A 3 b B 1 c A 2 d A 1 e C 2 f C 1
label_rep_index_str
- dramkit.gentools.label_rep_index_str(df)
df (pandas.DataFrame) 中的index若有重复,对重复的index进行后缀编号,返回新的 pandas.DataFrame
Note
若存在重复的index,则添加后缀编号之后返回的df,其index为str类型
Examples
>>> df = pd.DataFrame([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) >>> label_rep_index_str(df) 0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9 10 >>> df.index = [0, 0, 0, 1, 1, 2, 2, 2, 2, 3] >>> label_rep_index_str(df) 0 0 1 0_2 2 0_3 3 1 4 1_2 5 2 6 2_2 7 2_3 8 2_4 9 3 10
drop_index_duplicates
- dramkit.gentools.drop_index_duplicates(df, keep='first')
删除
df (pandas.DataFrame)
中index重复的记录
count_values
- dramkit.gentools.count_values(df, cols=None)
计算df列中cols指定列的值出现次数
count_index
- dramkit.gentools.count_index(df)
计算df的每个index出现的次数
get_all_turns
- dramkit.gentools.get_all_turns(series)
获取序列series中所有的转折点,返回series中-1位低点,1位高点
group_shift
- dramkit.gentools.group_shift()
分组shift,待实现
rolling_corr
- dramkit.gentools.rolling_corr(df, col1, col2, *args, **kwargs)
滚动相关性(2个变量)
group_fillna
- dramkit.gentools.group_fillna(df, col_fill, cols_groupby, return_all=False, **kwargs_fillna)
分组缺失值填充
group_rank
- dramkit.gentools.group_rank(df, col_rank, cols_groupby, return_all=False, **kwargs_rank)
分组排序
对df(pandas.DataFrame)中
cols_rank
指定列按cols_groupby
指定列分组排序Todo
col_rank可为列表,此时返回dataframe,return_all设置只返回指定列结果或者返回全部dataframe
- Parameters:
df (pandas.DataFrame) – 待排序数据表
col_rank (str) – 需要排序的列
cols_groupby (str, list) – 分组依据列
**kwargs_rank – pandas中rank函数接受的参数
- Returns:
series_rank – 排序结果
- Return type:
pandas.Series
get_func_df_concat
- dramkit.gentools.get_func_df_concat(func, args_kwargs_list, concat_axis=0)
- func(args, **kwargs)作用于args_kwargs_list中的参数,将所有结果concat返回func函数返回值必须为pd.DataFrame
Examples
>>> func = lambda x, y, z: pd.DataFrame({'a': [x, y, z]}) >>> args_kwargs_list = [[(1, 2), {'z': 3}], ... [(2, 3), {'z': 4}]] >>> get_func_df_concat(func, args_kwargs_list) a 0 1 1 2 2 3 0 2 1 3 2 4
df_groupby_func
- dramkit.gentools.df_groupby_func(df, by_cols, func, as_index=False, group_keys=False, reset_index='drop', *args_func, **kwargs_func)
Todo
by_cols中有nan时的检查和处理
group_keys和as_index的异同检查确认
df按by_cols分组作用于func(x, *args, **kwargs)函数reset_index:若为drop,则重置返回的index若为False,则不处理groupby之后的index若为ori,如返回数据与原始df行数相同,则使用原始index,否则不处理groupby之后的index解决groupby.apply出现执行多次的问题(貌似pandas更新到1.5.0之后没出现了),参考:
bootstrapping
- dramkit.gentools.bootstrapping()
bootstraping, 待实现
groupby_rolling_func
- dramkit.gentools.groupby_rolling_func(data, cols_groupby, cols_val, func, keep_index=True, kwargs_rolling={}, kwargs_func={})
data按照cols_groupby分组,然后在cols_val列上rolling调用func, func的参数通过kwargs_func设定, 若cols_val为str,则返回pd.Series;若为list,则返回pd.DataFrame 若keep_index为True,则返回结果中的index与data一样,否则返回结果中的index由 cols_groupby设定
merge_dicts
- dramkit.gentools.merge_dicts(dicts)
将多个字典合并成一个字典
link_lists
- dramkit.gentools.link_lists(lists)
- 将多个列表连接成一个列表注:lists为列表,其每个元素也为一个列表
Examples
>>> a = [1, 2, 3] >>> b = [4, 5, 6] >>> c = ['a', 'b'] >>> d = [a, b] >>> link_lists([a, b, c, d]) [1, 2, 3, 4, 5, 6, 'a', 'b', [1, 2, 3], [4, 5, 6]]
get_lists_inter
- dramkit.gentools.get_lists_inter(lists)
获取多个列表的交集
list_eq
- dramkit.gentools.list_eq(l1, l2, order=True)
判断两个列表l1和l2是否相等,若orde为False,则只要元素相同即判断为相等
get_num_decimal
- dramkit.gentools.get_num_decimal(x, ignore_tail0=True)
- 获取浮点数x的小数位数ignore_tail0设置是否忽略小数点尾部无效的0
list2dict
- dramkit.gentools.list2dict(list_key, list_val)
两个列表生成字典
sort_dict
- dramkit.gentools.sort_dict(d, by='key', reverse=False)
对字典排序,by设置依据’key’还是’value’排,reverse同sorted函数参数
insert_into_list
- dramkit.gentools.insert_into_list(l, val, loc_val, dirt='before')
在列表l中的loc_val前或后插入val
count_list
- dramkit.gentools.count_list(l)
对l中的元素计数,返回dict
tmprint
- dramkit.gentools.tmprint(*args, **kwargs)
url2chn
- dramkit.gentools.url2chn(url)
将url中的中文乱码(实际上为utf-8)转化为正常url
replace_con_blank
- dramkit.gentools.replace_con_blank(string, to)
将string中连续空格替换为to指定字符
change_dict_key
- dramkit.gentools.change_dict_key(d, func_key_map)
修改字典d中的key值,即d[k]=v变成d[func_key_map(k)]=v
get_func_arg_info
- dramkit.gentools.get_func_arg_info(func)
获取函数func的参数信息
Examples
>>> def func(a:int, b, c:float=1.0, c1=2, *args, ... e, d=2, **kwargs): ... pass >>> get_func_arg_info(func) {'args': ['a', 'b', 'c', 'c1'], 'argdefaults': {'c': 1.0, 'c1': 2}, 'varargs': 'args', 'varkw': 'kwargs', 'kwonlyargs': ['e', 'd'], 'kwonlydefaults': {'d': 2}, 'annotations': {'a': int, 'c': float}}
parse_args
- dramkit.gentools.parse_args(args_info, description=None)
命令行参数解析
- Parameters:
args_info (list, dict) –
参数信息若无分组,格式为:[([‘-a’, ‘–arg1’, …], {‘type’: int, ‘default’: 1, …}),([‘-b’, …], {…}),…]若有分组,格式为:{‘groupname1’:[([‘-a’, ‘–arg1’, …], {‘type’: int, ‘default’: 1, …}),([‘-b’, …], {…}),…],’groupname2’: […], …}
gen_args_praser
- dramkit.gentools.gen_args_praser(func, native_eval_types=('list', 'tuple', 'dict'))
生成func函数的参数解析
get_topn_names
- dramkit.gentools.get_topn_names(df, n=1, max_or_min='max', col_or_row='col')
获取df中值最大/小的前n列/行的名称
Examples
>>> d = {'A': [30, 2, 6, 4, 5], ... 'B': [6, 7, 80, 5, 10], ... 'C': [11, 12, 13, 14, 15], ... 'D': [16, 17, 18, 19, 20], ... 'E': [21, 22, 23, 24, 25]} >>> df = pd.DataFrame(d, index=['a', 'b', 'c', 'd', 'e']) >>> res = get_topn_names(df, n=2) >>> res1 = get_topn_names(df, n=2, col_or_row='row')
install_check
install_check
- dramkit.install_check.install_check()
检查是否成功安装dramkit
若成功安装,会打印版本号和相关提示信息
iotools
GetInputTimeoutError
- class dramkit.iotools.GetInputTimeoutError
Bases:
Exception
get_input_timeout_win
- dramkit.iotools.get_input_timeout_win(timeout=10, hint_str=None)
- windows下设置在等待时间内获取input
get_input_timeout_notwin
- dramkit.iotools.get_input_timeout_notwin(timeout=10, hint_str=None)
- 非windows下设置在等待时间内获取input
get_input_timeout
- dramkit.iotools.get_input_timeout(timeout=10, hint_str=None, logger=None)
get_input_with_timeout
- dramkit.iotools.get_input_with_timeout(timeout=10, hint_str=None, logger=None)
- 通过input函数获取输入若等待时间超过timeout秒没接收到输入,则返回Nonehint_str为input函数提示信息
get_input_multi_line
- dramkit.iotools.get_input_multi_line(end_word='end', end_char='', hint_str=None, hint_lineno=True)
- input读取多行输入(包括回车换行符也会被保留)end_word设置结束词,当在一行里面输入完整结束词,则表示输入完毕注:若end_word设置为’’,则空行输入回车时会结束输入, 此时返回结果中上下行之间回车换行符不会被保留end_char设置结束字符串,当在一行里面任意位置输入end_char,则表示输入完毕, 此时返回在end_char之前输入的内容(不包括end_char和end_char之后的内容)hint_str设置input函数中的输入提示若hint_lineno为True,则input函数输入时提示当前是第几行
pickle_file
- dramkit.iotools.pickle_file(data, file)
以二进制格式保存数据data到文件file
- Parameters:
data – 待保存内容
file (str) – 保存路径
unpickle_file
- dramkit.iotools.unpickle_file(file)
读取二进制格式文件file
- Parameters:
file (str) – 待读取文件路径
load_yml
- dramkit.iotools.load_yml(fpath, **kwargs_open)
write_yml
- dramkit.iotools.write_yml()
写入yml文件
load_json
- dramkit.iotools.load_json(fpath, encoding=None, logger=None)
读取json格式文件
- Parameters:
fpath (str) – 待读取文件路径
encoding (str, None) – 文件编码格式,若不指定,则尝试用utf-8和gbk编码读取
logger (logging.Logger) – 日志记录器
- Returns:
dict - 返回读取数据
write_json
- dramkit.iotools.write_json(data, fpath, encoding=None, mode='w', **kwargs)
把data写入json格式文件
- Parameters:
data (dict) – 待写入数据
fpath (str) – 文件保存路径
encoding (str, None) – 文件编码格式
read_lines
- dramkit.iotools.read_lines(fpath, encoding=None, split=True, logger=None, **kwargs_open)
读取文本文件中的所有行
- Parameters:
fpath (str) – 待读取文件路径
encoding (str, None) – 文件编码格式,若不指定,则尝试用utf-8和gbk编码读取
logger (None, logging.Logger) – 日志记录器
- Returns:
list - 文本文件中每行内容列表
write_txt
- dramkit.iotools.write_txt(lines, file, mode='w', check_end=True, **kwargs)
将列表lines中的内容写入文本文件中
- Parameters:
lines (list) – 列表,每个元素为一行文本内容
file (str) – 保存路径
mode (str) – 写入模式,如’w’或’a’
check_end (bool) – 是否检查每行行尾换行符,默认若行尾已经有换行符,则不再新添加换行符
**kwargs –
open
函数接受的关键字,如encoding等
Note
不同系统文本文件每行默认结尾字符不同:
linux下一般以`\n`结尾
windows下一般以`\r\n`结尾
苹果系统一般以`\r`结尾
load_text
- dramkit.iotools.load_text(fpath, sep=',', del_first_line=False, del_first_col=False, to_pd=True, keep_header=True, encoding=None, del_last_col=False, logger=None)
读取文本文件数据,要求文件中每行存放一个数据样本
- Parameters:
fpath (str) – 文本文件路径
sep (str) – 字段分隔符,默认`,`
del_first_line (bool) –
是否删除首行,默认不删除
Note
若del_first_line为True,则输出pandas.DataFrame没有列名
del_first_col (bool) – 是否删除首列,默认不删除
to_pd (bool) – 是否输出为pandas.DataFrame,默认是
keep_header (bool) – 输出为pandas.DataFrame时是否以首行作为列名,默认是
encoding (str, None) – 指定编码方式,默认不指定时会尝试以uft-8和gbk编码读取
del_last_col (bool) – 是否删除最后一列,默认否
logger (logging.Logger, None) – 日志记录器
- Returns:
list, pandas.DataFrame - 返回读取的数据
df2file_pd
- dramkit.iotools.df2file_pd(df, fpath, ftype=None, **kwargs)
pandas.DataFrame写入文件
process_df_pd_big
- dramkit.iotools.process_df_pd_big(fpath: str, ftype: Optional[str] = None, chunksize: int = 100000, del_unname_cols: bool = True, logger: Optional[Logger] = None, read_kwargs: dict = {}, process_func=None, process_args: tuple = (), process_kwargs: dict = {}, return_df: bool = True)
用pandas读取大数据文件并分批处理,返回处理后的结果
load_df_pd
- dramkit.iotools.load_df_pd(fpath: str, ftype: Optional[str] = None, del_unname_cols: bool = True, encoding: Optional[str] = None, logger: Optional[Logger] = None, **kwargs)
用pandas读取文件,返回DataFrame
load_csv
- dramkit.iotools.load_csv(fpath, del_unname_cols=True, encoding=None, logger=None, **kwargs)
用pandas读取csv数据
- Parameters:
fpath (str) – csv文件路径
del_unname_cols (bool) – 是否删除未命名列,默认删除
encoding (str, None) – 指定编码方式,默认不指定,不指定时会尝试以uft-8和gbk编码读取
logger (logging.Logger, None) – 日志记录器
**kwargs – 其它
pd.read_csv
支持的参数
- Returns:
pandas.DataFrame - 读取的数据
load_dfs_pd
- dramkit.iotools.load_dfs_pd(fpaths, kwargs_sort={}, kwargs_drop_dup={}, mark_file=False, **kwargs_loaddf)
读取指定路径列表中所有的dataframe数据文件,整合到一个df里面
- Parameters:
fpaths (list) – 文件夹路径列表
kwargs_sort (dict) – 设置sort_values接受的排序参数
kwargs_drop_dup (dict) – 设置drop_duplicates接受的去重参数
**kwargs_loaddf –
dramkit.iotools.load_df_pd()
接受的其它参数
- Returns:
pandas.DataFrame - 读取的数据
load_csvs
- dramkit.iotools.load_csvs(fpaths, kwargs_sort={}, kwargs_drop_dup={}, **kwargs_loadcsv)
读取指定路径列表中所有的csv文件,整合到一个df里面
- Parameters:
fpaths (list) – 文件夹路径列表
kwargs_sort (dict) – 设置sort_values接受的排序参数
kwargs_drop_dup (dict) – 设置drop_duplicates接受的去重参数
**kwargs_loadcsv –
dramkit.iotools.load_dfs_pd()
接受的其它参数
- Returns:
pandas.DataFrame - 读取的数据
load_csvs_dir
- dramkit.iotools.load_csvs_dir(fdir, kwargs_sort={}, kwargs_drop_dup={}, **kwargs_loadcsv)
读取指定文件夹中所有的csv文件,整合到一个df里面
- Parameters:
fdir (str) – 文件夹路径
kwargs_sort (dict) – 设置sort_values接受的排序参数
kwargs_drop_dup (dict) – 设置drop_duplicates接受的去重参数
**kwargs_loadcsv –
dramkit.iotools.load_dfs_pd()
接受的其它参数
- Returns:
pandas.DataFrame - 读取的数据
load_excels
- dramkit.iotools.load_excels(fdirorfpaths, kwargs_sort={}, kwargs_drop_dup={}, **kwargs_readexcel)
读取指定文件夹中所有的excel文件,整合到一个df里面
- Parameters:
fdirorfpaths (str, list) – Excel文件所在文件夹路径或Excel文件路径列表
kwargs_sort (dict) – 设置sort_values接受的排序参数
kwargs_drop_dup (dict) – 设置drop_duplicates接受的去重参数
**kwargs_readexcel –
dramkit.iotools.load_dfs_pd
接受的其它参数
- Returns:
pandas.DataFrame - 读取的数据
archive_df
- dramkit.iotools.archive_df(df_old, df_new, idcols=None, del_dup_cols=None, rep_keep='new', sort_cols=None, ascendings=True, old_ftype=None, new_ftype=None, save_ftype=None, save_path=None, kwargs_read_old={}, kwargs_read_new={}, kwargs_save={'index': None}, method='merge', logger=None)
合并df_new和df_old,再排序、去重、写入csv或excel
add_ext_to_filename
- dramkit.iotools.add_ext_to_filename(fpath, ext)
在fpath路径的尾部(扩展名的前面)添加尾缀ext
cut_dffile_by_maxline
- dramkit.iotools.cut_dffile_by_maxline(fpath, max_line=10000, kwargs_loaddf={}, kwargs_tofile={'index': None})
get_fpath_ext_num
- dramkit.iotools.get_fpath_ext_num(fpath)
- 给定路径fpath,若不存在,则返回fpath,若存在则返回带数字在后缀的路径比如fpath=xxx.y存在,则返回xxx_1.y,xxx_1.y也存在,则返回xxx_2.y
cut_bigdffile_by_maxline
- dramkit.iotools.cut_bigdffile_by_maxline(fpath, max_line=100000, kwargs_loaddf={}, kwargs_tofile={'index': None})
cut_dffile_by_year
- dramkit.iotools.cut_dffile_by_year(fpath, tcol=None, name_last_year=False, kwargs_loaddf={}, kwargs_tofile={'index': None}, kwargs_exists_merge={}, logger=None)
dataframe大文件分割,按年份,tcol指定时间列
Todo
指定几年的合并为一个文件,而不是固定每一年单独一个文件
cut_csv_by_year
- dramkit.iotools.cut_csv_by_year(fpath, tcol=None, name_last_year=False, kwargs_loadcsv={}, kwargs_tocsv={'index': None}, logger=None)
csv大文件分割,按年份,tcol指定时间列
clear_text_file
- dramkit.iotools.clear_text_file(fpath, encoding=None)
清空文本文件中的内容,保留文件
clear_specified_type_files
- dramkit.iotools.clear_specified_type_files(dir_path, type_list, encoding=None, recu_sub_dir=False, **kwargs_getpath)
清空dir_path文件夹下所有类型在type_list中的文件内容,保留文件夹
get_all_paths
- dramkit.iotools.get_all_paths(dir_path, ext=None, start=None, include_dir=False, abspath=False, recu_sub_dir=True, only_dir=False, name_func=None)
获取指定文件夹(及其子文件夹)中所有的文件路径
- Parameters:
dir_path (str) – 文件夹路径
ext (None, str, list, tuple) – 指定文件后缀列表,若为None,则包含所有类型文件
start (None, str, list, tuple) – 指定文件前缀名列表,若为None,则包含所有类型文件
include_dir (bool) – 返回结果中是否包含文件夹路径(包含返回文件路径的文件夹),默认不包含(即只返回文件路径)
abspath (bool) – 是否返回绝对路径,默认返回相对路径
rec_sub_dir (bool) – 若为False,则不对子文件中的文件进行遍历,即只返回dir_path下的文件(夹)路径
only_dir (bool) – 若为True,则只返回子文件夹路径(文件夹名称尾部与ext指定尾部相同的文件夹), 不返回文件路径,此时include_dir不起作用
name_func (function) – 判断是否符合文件名筛选的函数,即满足name_func(fpath)值为True的路径被保留, name_func参数为路径,返回结果只能为True或False
- Returns:
list - 文件路径列表
del_specified_type_files
- dramkit.iotools.del_specified_type_files(dir_path, type_list, recu_sub_dir=True, **kwargs_getpath)
删除dir_path文件夹下所有类型在type_list中的文件
del_dir
- dramkit.iotools.del_dir(dir_path)
删除dir_path指定文件夹及其所有内容
del_specified_subdir
- dramkit.iotools.del_specified_subdir(dir_path, del_names, recu_sub_dir=True, **kwargs_getpath)
删除dir_path文件夹下所有文件夹名尾缀在del_names中的子文件夹
copy_file_to_dir
- dramkit.iotools.copy_file_to_dir(src_path, tgt_dir, force=True)
复制文件
copy_file_to_file
- dramkit.iotools.copy_file_to_file(src_path, tgt_path, force=True)
文件复制(指定文件名)
copy_dir_structure
- dramkit.iotools.copy_dir_structure(src_dir, tgt_dir, ext=None, recu_sub_dir=True, keep_root_same=True, return_map=False, **kwargs_getpath)
复制文件夹结构,不复制里面的文件
copy_dir
- dramkit.iotools.copy_dir(src_dir, tgt_dir, ext_file=None, ext_dir=None, recu_sub_dir=True, force=True, keep_root_same=True, keep_empty_dir_when_not_recu=False, kwargs_getpath_dir={}, kwargs_getpath_file={})
复制整个文件夹及其内容
move_file_to_file
- dramkit.iotools.move_file_to_file(src_path, tgt_path, force=True)
移动文件(指定文件名)
move_file_to_dir
- dramkit.iotools.move_file_to_dir(src_path, tgt_dir, force=True)
移动文件
move_dir
- dramkit.iotools.move_dir(src_dir, tgt_dir, ext_file=None, ext_dir=None, recu_sub_dir=True, force=True, keep_root_same=True, keep_empty_dir_when_not_recu=False, kwargs_getpath_dir={}, kwargs_getpath_file={})
移动整个文件夹及其内容
move_specified_type_files
- dramkit.iotools.move_specified_type_files(dir_path, type_list, target_dir, recu_sub_dir=True)
移动dir_path文件夹下所有类型在type_list中的文件到target_dir中
make_dir
- dramkit.iotools.make_dir(dir_path)
新建文件夹
make_file_dir
- dramkit.iotools.make_file_dir(fpath)
若路径fpath所指文件夹不存在,创建之
make_path_dir
- dramkit.iotools.make_path_dir(fpath)
若fpath所指文件夹路径不存在,则新建之
get_last_change_time
- dramkit.iotools.get_last_change_time(fpath, strformat='%Y-%m-%d %H:%M:%S')
获取文件的最后修改时间
get_file_info
- dramkit.iotools.get_file_info(fpath)
获取文件信息
get_files_info
- dramkit.iotools.get_files_info(fpaths)
获取列表中的文件信息
get_files_info_dir
- dramkit.iotools.get_files_info_dir(dir_path, **kwargs_getpath)
- 获取dir_path文件夹中所有文件的信息**kwargs为
dramkit.iotoos.get_all_paths()
接受的参数
py2pyc
- dramkit.iotools.py2pyc(py_path, pyc_path=None, force=True, del_py=False, **kwargs_compile)
py文件编译为pyc文件
py2pyc_dir
- dramkit.iotools.py2pyc_dir(dir_path, force=True, del_py=False, recu_sub_dir=True, kwargs_compile={}, kwargs_getpath={})
将一个文件夹下的所有.py文件编译为.pyc文件
pyc2py
- dramkit.iotools.pyc2py(pyc_path, py_path=None, force=True, del_pyc=False, logger=None)
pyc文件反编译为py
pyc2py_dir
- dramkit.iotools.pyc2py_dir(dir_path, force=True, del_pyc=False, recu_sub_dir=True, logger=None, **kwargs_getpath)
将一个文件夹下的所有.pyc文件反编译为.py文件
get_mac_address
- dramkit.iotools.get_mac_address()
获取Mac地址,返回大写地址,如:F8-A2-D6-CC-BB-AA
get_ipconfig_all_win
- dramkit.iotools.get_ipconfig_all_win(logger=False)
get_ipconfig_win
- dramkit.iotools.get_ipconfig_win(logger=False)
get_hardware_ids
- dramkit.iotools.get_hardware_ids()
- 获取电脑硬件序列号信息,包括主板和硬盘序列号、MAC地址、BIOS序列号等
get_ip1
- dramkit.iotools.get_ip1()
get_ip2
- dramkit.iotools.get_ip2()
get_ip_public
- dramkit.iotools.get_ip_public()
- 获取公网ip
get_ip_public2
- dramkit.iotools.get_ip_public2()
get_tasks_info_win
- dramkit.iotools.get_tasks_info_win()
windows下获取任务信息
get_ports_info_win
- dramkit.iotools.get_ports_info_win()
windows下获取端口信息
zip_files
- dramkit.iotools.zip_files(zip_path, fpaths, keep_ori_path=True, keep_zip_new=True)
使用zipfile将指定路径列表(不包括子文件(夹)内容)打包为.zip文件
- Parameters:
zip_path (str) – zip压缩包保存路径
fpaths (list) – 需要压缩的路径列表(应为相对路径)
keep_ori_path (bool) –
若为True, 则压缩文件会保留fpaths中文件的原始路径
若为False, 则fpaths中所有文件在压缩文件中都在统一根目录下
keep_zip_new (bool) – 若为True,将覆盖已有压缩文件,否则在已有文件里面新增
zip_fpath
- dramkit.iotools.zip_fpath(fpath, zip_path=None, kwargs_zip={}, kwargs_getpath={})
使用zipfile压缩单个文件(夹)下所有内容为.zip文件
- Parameters:
fpath (str) – 待压缩文件(夹)路径(应为相对路径)
zip_path (None, str) – 压缩文件保存路径,若为None,则为fpath路径加后缀
**kwargs_zip –
dramkit.iotools.zip_files()
接受的参数
zip_fpaths
- dramkit.iotools.zip_fpaths(zip_path, fpaths, kwargs_zip={}, kwargs_getpath={})
使用zipfile将指定路径列表(包括子文件(夹)所有内容)打包为.zip文件
- Parameters:
zip_path (str) – zip压缩包保存路径
fpaths (list) – 需要压缩的路径列表(可为文件也可为文件夹, 应为相对路径)
**kwargs_zip –
dramkit.iotools.zip_files()
接受的参数
zip_extract
- dramkit.iotools.zip_extract(fzip, to_dir=None, replace_exists=True)
用zipfile解压文件
zip_fpath_7z
- dramkit.iotools.zip_fpath_7z(fpath, zip_path=None, mode='zip', pwd=None, keep_zip_new=True)
7z命令压缩单个文件(夹)到.zip文件
- Parameters:
fpath (str) – 待压缩文件(夹)路径
zip_path (None, str) – 压缩文件保存路径,若为None,则为fpath路径加后缀
mode (str) – 压缩文件后缀,可选[‘7z’, ‘zip’]
pwd (str) – 密码字符串
keep_zip_new (bool) –
若为True,则zip_path将覆盖原来已经存在的文件
若为False,则zip_path将在原来已有的文件中新增需要压缩的文件
zip_fpaths_7z
- dramkit.iotools.zip_fpaths_7z(zip_path, fpaths, mode='zip', pwd=None, keep_zip_new=True)
7z命令压缩多个文件(夹)列表到.zip文件
- Parameters:
zip_path (str) – zip压缩包保存路径
fpaths (list) –
待压缩文件(夹)路径列表
Warning
fpaths太长的时候可能会报错
mode (str) – 压缩文件后缀,可选[‘7z’, ‘zip’]
pwd (str) – 密码字符串
keep_zip_new (bool) –
若为True,则zip_path将覆盖原来已经存在的文件
若为False,则zip_path将在原来已有的文件中新增需要压缩的文件
extract_7z
- dramkit.iotools.extract_7z()
7z命令解压文件
cmdrun
- dramkit.iotools.cmdrun(cmd_str, logger=None)
调用cmd执行cmd_str命令
cmdruns_multi_process
- dramkit.iotools.cmdruns_multi_process(cmd_str_list, logger=None, multi_line=None, keep_order=True)
cmd_run_pys_multi_process
- dramkit.iotools.cmd_run_pys_multi_process(py_list, logger=None, multi_line=None, keep_order=True)
cmd_run_pys
- dramkit.iotools.cmd_run_pys(py_list, logger=None)
cmd命令批量运行py脚本,logger捕捉日志
rename_files_in_dir
- dramkit.iotools.rename_files_in_dir(dir_path, func_rename)
对指定文件夹中的文件进行批量重命名
- Parameters:
dir_path (str) – 目标文件夹
func_rename (function) – 命名规则函数: name_new = func_rename(name)
find_files_include_str
- dramkit.iotools.find_files_include_str(target_str, fpaths, re_match=False, return_all_find=False, logger=None)
在指定文件路径列表中,查找哪些文件里面包含了目标字符串
- Parameters:
target_str (str) – 目标字符串
fpaths (str, list) – 文件路径列表
re_match (bool) – 若为True,则目标字符串
target_str
按正则表达式处理return_all_find (bool) – 若为True,则返回所有找到的目标字符串,否则只返回第一个
logger (None, logging.Logger) – 日志记录器
- Returns:
dict - 返回dict, key为找到的文件路径,value为包含目标字符串的文本内容(仅第一次出现的位置)
find_dir_include_str
- dramkit.iotools.find_dir_include_str(target_str, root_dir=None, file_types=None, re_match=False, return_all_find=False, logger=None, **kwargs_getpath)
在指定目录下的文件中,查找哪些文件里面包含了目标字符串
Todo
增加排除文件类型设置
- Parameters:
target_str (str) – 目标字符串
root_dir (str, None) – 目标文件夹,目标字符串在此文件夹及其所有子文件内所有文本文件中搜索 若为None,则在os.getcwd()下搜索
file_types (None, str, list) –
指定查找的文件后缀范围:
None, 在所有文件中查找
str, 指定一类文件后缀, 如’.py’表示在Python脚本中查找
list, 指定一个后缀列表, 如[‘.py’, ‘.txt’]
re_match (bool) – 若为True,则目标字符串
target_str
按正则表达式处理return_all_find (bool) – 若为True,则返回所有找到的目标字符串,否则只返回第一个
logger (None, logging.Logger) – 日志记录器
- Returns:
dict - 返回dict, key为找到的文件路径,value为包含目标字符串的文本内容(仅第一次出现的位置)
replace_str_in_file
- dramkit.iotools.replace_str_in_file(fpath, ori_str, new_str, fpath_new=None, kwargs_read={}, kwargs_write={})
- 替换文本文件fpath中的ori_str为new_strfpath_new指定新文件路径若fpath_new为None,则新文件在原文件名加后缀_new若fpath_new为’replace’,则新文件替换原文件若fpath_new是函数,则新文件璐姐为fpath_new(fpath)kwargs_read为read_lines函数接收参数keargs_write为write_txt函数接收参数
replace_str_in_files
- dramkit.iotools.replace_str_in_files(fpaths, ori_str, new_str, fpath_new=None, kwargs_read={}, kwargs_write={})
文本文件内容批量替换,参数见
replace_str_in_file()
get_pip_pkgs_win_deprecated
- dramkit.iotools.get_pip_pkgs_win_deprecated()
获取windows下pip安装包列表
get_pip_pkgs_win
- dramkit.iotools.get_pip_pkgs_win()
获取windows下pip安装包列表
get_filedir
- dramkit.iotools.get_filedir(fpath)
获取fpath路径所在文件夹路径
get_filename
- dramkit.iotools.get_filename(fpath, with_ext=True)
获取文件名(去除前缀路径),with_ext为False则返回不带后缀
get_file_ext_type
- dramkit.iotools.get_file_ext_type(fpath)
提取文件扩展名
get_parent_path
- dramkit.iotools.get_parent_path(path, n=1)
获取路径path的n级父路径