dramkit

cryptotools

expand

dramkit.cryptotools.expand(text, baselen=16, encoding='utf-8')
如果text不足minlen位的倍数就用空格补足
注:utf-8编码中一个汉字占3个字节

en_aes_cbc

dramkit.cryptotools.en_aes_cbc(text, key=None, iv=None, encoding='utf-8')

AES CBC加密

de_aes_cbc

dramkit.cryptotools.de_aes_cbc(text, key=None, iv=None, encoding='utf-8')

AEC CBC解密

en_aes_ecb

dramkit.cryptotools.en_aes_ecb(text, key=None, encoding='utf-8')

AES ECB加密

de_aes_ecb

dramkit.cryptotools.de_aes_ecb(text, key=None, encoding='utf-8')

AES ECB加密

en_base64

dramkit.cryptotools.en_base64(text, encoding='utf-8')

base64加密

de_base64

dramkit.cryptotools.de_base64(text, encoding='utf-8')

base64解密

en_base64_url

dramkit.cryptotools.en_base64_url(text, encoding='utf-8')

base64加密

de_base64_url

dramkit.cryptotools.de_base64_url(text, encoding='utf-8')

base64解密

gen_random_token

dramkit.cryptotools.gen_random_token(length=64, spec_chars=False, rand_seed=None)

gen_random_tokens

dramkit.cryptotools.gen_random_tokens(n, **kwargs)

datetimetools

pd_str2datetime

dramkit.datetimetools.pd_str2datetime(o, **kwargs)

pd.to_datetime函数封装,排除o为时间戳(统一按str处理)

datetime2str

dramkit.datetimetools.datetime2str(dt, strformat=None, tz='local')

datatime格式转为str格式

Parameters:
  • dt (datetime.datetime, datetime.date, time.struct_time, pd.Timestamp) – datetime、time或pandas格式时间

  • strformat (None, bool, str) –

    设置返回文本格式:

    • False, 直接返回dt

    • ’timestamp’, 返回timestamp

    • ’int’, 返回int格式日期,如’20221106’

    • None, 默认’%Y-%m-%d %H:%M:%S’格式

    • 指定其他文本格式

  • tz (str) – 指定时区,可选[‘local’, ‘utc’]

Examples

>>> dt1, dt2 = datetime.datetime.now(), time.localtime()
>>> dt3 = pd_str2datetime(datetime_now())
>>> datetime2str(dt1, 'timestamp', 'utc')
>>> datetime2str(dt2, 'timestamp', 'utc')
>>> datetime2str(dt3, 'timestamp', 'utc')
>>> dt4 = time.gmtime()
>>> datetime2str(dt3, 'timestamp', 'utc')
>>> datetime2str(dt3, 'timestamp')

dtseries2str

dramkit.datetimetools.dtseries2str(series, joiner='-', strformat=None)

pd.Series转化为str,若不指定strformat,则按joiner连接年月日

get_datetime_strformat

dramkit.datetimetools.get_datetime_strformat(tstr, ymd=['-', '', '.', '/'], ymd_hms=[' ', '', '.'], hms=[':', ''], hms_ms=['.', ''])
获取常用的文本时间格式format,支持包含以下几个部分的时间类型:
年月日、年月日时分秒、年月日时分、年月日时分秒毫秒、年月日时、年月

Todo

改为用正则表达式匹配(根据连接符、数字占位限制和数字大小限制来匹配)

Parameters:
  • ymd (list) – 年月日连接符

  • ymd_hms (list) – 年月日-时分秒连接符

  • hms (list) – 时分秒连接符

  • hms_ms (list) – 时分秒-毫秒连接符

str2datetime

dramkit.datetimetools.str2datetime(tstr, strformat=None)

时间字符串转datetime格式

str2timestamp

dramkit.datetimetools.str2timestamp(t, strformat=None, tz='local')
时间字符串转时间戳
若大于16位(纳秒级),返回string,否则返回float(毫秒级)

timestamp2str

dramkit.datetimetools.timestamp2str(t, strformat=None, tz='local', method=2)
时间戳转化为字符串格式
注意:t小于0时直接按t为秒数处理(即使整数部分超过10位也不会处理为小数)

get_datetime_format

dramkit.datetimetools.get_datetime_format(dt, ymd=['-', '', '.', '/'], ymd_hms=[' ', '', '.'], hms=[':', ''], hms_ms=['.', ''])
获取常用日期时间格式format
注:整数和浮点数都可能是时间戳,这里以常用的规则来判断:
- 若整数长度为8位,判断为整数型日期
- 若整数或浮点数长度不小于10位不大于19位,判断为时间戳
- 其他情况下的整数或浮点数报错处理

copy_format0

dramkit.datetimetools.copy_format0(to_tm, from_tm)
复制日期时间格式
若from_tm是日期时间格式或时间戳格式,则直接返回to_tm

x2datetime

dramkit.datetimetools.x2datetime(x, tz='local')
x转化为datetime格式,若x为timestamp,应设置时区tz
若x为8位整数,则转化为str处理,其余情况直接用用pd.to_datetime处理

copy_format

dramkit.datetimetools.copy_format(to_tm, from_tm)

复制日期时间格式

time_add

dramkit.datetimetools.time_add(t, seconds=0, minutes=0, hours=0, days=0)

时间运算,return_format指定返回格式,若为False,返回datetime格式

date_add_nday

dramkit.datetimetools.date_add_nday(date, n=1)

在给定日期date上加上n天(减去时n写成负数即可)

is_datetime

dramkit.datetimetools.is_datetime(x)

判断x是否为时间日期

is_month_end

dramkit.datetimetools.is_month_end(date=None, tz='local')

判断date是否为月末,若date为timestamp,应设置时区tz

is_month_start

dramkit.datetimetools.is_month_start(date=None, tz='local')

判断date是否为月初,若date为timestamp,应设置时区tz

is_quarter_end

dramkit.datetimetools.is_quarter_end(date=None, tz='local')

判断date是否为季末,若date为timestamp,应设置时区tz

is_quarter_start

dramkit.datetimetools.is_quarter_start(date=None, tz='local')

判断date是否为季末,若date为timestamp,应设置时区tz

datetime_now

dramkit.datetimetools.datetime_now(strformat=None)

获取当前时间

today_month

dramkit.datetimetools.today_month(joiner='-')

获取今日所在年月,格式由连接符joiner确定

today_date

dramkit.datetimetools.today_date(joiner='-')

获取今日日期,格式由连接符joiner确定

get_quarter

dramkit.datetimetools.get_quarter(dt=None, joiner='Q')

获取日期所在季度

get_quarter_start_end_by_yq

dramkit.datetimetools.get_quarter_start_end_by_yq(y, q, joiner='-')

get_quarter_start_end_by_date

dramkit.datetimetools.get_quarter_start_end_by_date(date=None)

获取date日期所在季度的起始日期

today_quarter

dramkit.datetimetools.today_quarter(joiner='Q')

获取今日所在季度

get_2part_next

dramkit.datetimetools.get_2part_next(part1, part2, step, n=1)

Example

>>> get_2part_next(2023, 2, 4, 2) # 2023Q2往后推2个季度
(2023, 4)
>>> get_2part_next(2023, 4, 4, 5) # 2023Q4往后推5个季度
(2025, 1)
>>> get_2part_next(2023, 4, 12, 8) # 202304往后推8个月
(2023, 12)
>>> get_2part_next(2023, 9, 12, 10) # 202309往后推10个月
(2024, 7)
>>> get_2part_next(2023, 2, 4, -2) # 2023Q2往前推2个季度
(2022, 4)
>>> get_2part_next(2023, 4, 4, -5) # 2023Q4往前推5个季度
(2022, 3)
>>> get_2part_next(2023, 4, 12, -8) # 202304往前推8个月
(2022, 4)
>>> get_2part_next(2023, 9, 12, -10) # 202309往前推10个月
(2022, 11)

get_pre_quarter

dramkit.datetimetools.get_pre_quarter(date=None, n=1, joiner='Q')
获取date前第n个季度
如:
get_pre_quarter(“20230418”) -> “2023Q1”
get_pre_quarter(“20230418”, 2) -> “2022Q4”
get_pre_quarter(“20230418”, 3) -> “2022Q3”
get_pre_quarter(“20230418”, 4) -> “2022Q2”
get_pre_quarter(“20230418”, 5) -> “2022Q1”
get_pre_quarter(“20230418”, 6) -> “2021Q4”
get_pre_quarter(“20230418”, 7) -> “2021Q3”
get_pre_quarter(“20230418”, 8) -> “2021Q2”

get_dayofweek

dramkit.datetimetools.get_dayofweek(date=None)

返回date属于星期几(1-7)

get_dayofyear

dramkit.datetimetools.get_dayofyear(date=None)

返回date属于一年当中的第几天(从1开始记)

get_month_end

dramkit.datetimetools.get_month_end(date=None)

获取date所在月的月末日期

get_next_nmonth_start_end

dramkit.datetimetools.get_next_nmonth_start_end(date=None, n=1)

get_next_nquarter_start_end

dramkit.datetimetools.get_next_nquarter_start_end(date=None, n=1)

get_date_format

dramkit.datetimetools.get_date_format(date, joiners=[' ', '-', '/', '*', '#', '@', '.', '_'])

判断日期格式位,返回日期位数和连接符

Parameters:
  • date (str) – 表示日期的字符串

  • joiners (list) –

    支持的格式连接符,不支持的连接符可能报错或返回’未知日期格式’

    Note

    注:若date无格式连接符,则只判断8位的格式,如’20200220’

Returns:

tuple - (日期位数(可能为date字符串长度或’未知日期格式’), 格式连接符(或None))

date_reformat_simple

dramkit.datetimetools.date_reformat_simple(date, joiner_ori, joiner_new='-')

时间格式简单转化

date_reformat_chn

dramkit.datetimetools.date_reformat_chn(date, joiner='-')

指定连接符为joiner,重新格式化date,date格式为x年x月x日

gen_strformat

dramkit.datetimetools.gen_strformat(joiner='')

date_reformat

dramkit.datetimetools.date_reformat(date, joiner='-')

指定连接符为joiner,重新格式化date

date8_to_date10

dramkit.datetimetools.date8_to_date10(date, joiner='-')

8位日期转换为10位日期,连接符为joiner

date10_to_date8

dramkit.datetimetools.date10_to_date8(date)

10位日期转换为8位日期

diff_time_second

dramkit.datetimetools.diff_time_second(time1, time2)
计算两个时间的差:time1-time2,time1和time2的格式应相同
返回两个时间差的秒数

diff_days_date

dramkit.datetimetools.diff_days_date(date1, date2)

计算两个日期间相隔天数,若date1大于date2,则输出为正,否则为负

get_dates_between

dramkit.datetimetools.get_dates_between(date1, date2, keep1=False, keep2=True, only_workday=False, del_weekend=False, joiner=2)

获取date1到date2之间的日期列表,keep1和keep2设置结果是否保留date1和date2

Note

是否为workday用了chncal包,若chncal库没更新,可能导致结果不准确

get_work_dates_chncal

dramkit.datetimetools.get_work_dates_chncal(start_date, end_date=None)

利用chncal获取指定范围内的工作日列表

get_recent_workday_chncal

dramkit.datetimetools.get_recent_workday_chncal(date=None, dirt='post')

若date为工作日,则返回,否则返回下一个(post)或上一个(pre)工作日

Note

若chncal库没更新,可能导致结果不准确

get_next_nth_workday_chncal

dramkit.datetimetools.get_next_nth_workday_chncal(date=None, n=1)
给定日期date,返回其后第n个工作日日期,n可为负数(返回结果在date之前)
若n为0,直接返回date

Note

若chncal库没更新,可能导致结果不准确

get_date_weekday

dramkit.datetimetools.get_date_weekday(weekday, n_week=0, joiner='')

获取指定星期的日期

Parameters:
  • weekday (int) – 指定星期,如5表示星期5,7表示星期天

  • n_week (int) – 表示距离现在第几个星期,0表示当前星期,大于0表示往后推,小于0表示往前推

Examples

>>> get_date_weekday(3) # 获取本周三的日期
>>> get_date_weekday(6, -1) # 获取上周六的日期
>>> get_date_weekday(7, -1) # 获取下周日的日期

get_recent_inweekday

dramkit.datetimetools.get_recent_inweekday(date=None, dirt='post')

若date不是周末,则返回date,否则返回下一个(post)或上一个(pre)周内日期

get_next_nth_inweekday

dramkit.datetimetools.get_next_nth_inweekday(date=None, n=1)
给定日期date,返回其后第n个周内日日期,n可为负数(返回结果在date之前)
若n为0,直接返回date

cut_date

dramkit.datetimetools.cut_date(start_date, end_date, n=500)

以间隔为n天划分start_date和end_date之间日期子集

Examples

>>> start_date, end_date, n = '20200201', '20200225', 10
>>> cut_date(start_date, end_date, n)
[['20200201', '20200210'],
 ['20200211', '20200220'],
 ['20200221', '20200225']]

diff_workdays_date

dramkit.datetimetools.diff_workdays_date(date1, date2)

计算两个日期间相隔的工作日天数,若date1大于date2,则输出为正,否则为负

gentools

General toolboxs

TimeRecoder

class dramkit.gentools.TimeRecoder(monotonic=True, logger=None)

Bases: object

运行时间记录器

Examples

>>> tr = TimeRecoder()
>>> time.sleep(5)
>>> tr.useds()
>>> time.sleep(60)
>>> tr.usedm()
>>> time.sleep(5)
>>> tr.used()
now()
used(logger=None)
usedm(logger=None)
useds(logger=None)

DirtModifyError

class dramkit.gentools.DirtModifyError

Bases: Exception

GenObject

class dramkit.gentools.GenObject(dirt_modify=True, **kwargs)

Bases: object

创建一个数据类型,用于存放变量值,
既可以用.key调用,也可以像字典一样调用[key]调用
__init__(dirt_modify=True, **kwargs)

初始化

clear()

清空所有属性变量

copy()
property dirt_modify
property items
property keys

显示所有key

listitems()
listkeys()
merge(o)

从另一个对象中合并属性和值

pop(key)

删除属性变量key,有返回

remove(key)

删除属性变量key,无返回

set_dirt_modify(dirt_modify)
set_from_dict(d)

通过dict批量增加属性变量

set_key_value(key, value)
update(o)

像dict一样通过update函数传入字典进行更新

DeprecatedError

class dramkit.gentools.DeprecatedError

Bases: Exception

StructureObject

class dramkit.gentools.StructureObject(*args, **kwargs)

Bases: object

run_func_with_timeout_thread

dramkit.gentools.run_func_with_timeout_thread(func, *args, timeout=10, logger_error=False, logger_timeout=None, timeout_show_str=None, kill_when_timeout=True, **kwargs)
限定时间(timeout秒)执行函数func,若限定时间内未执行完毕,返回None
args为tuple或list,为func函数接受的参数列表
注:在线程中强制结束函数可能导致未知错误,
比如文件资源打开了但是强制结束时不能关闭

with_timeout_thread

dramkit.gentools.with_timeout_thread(timeout=30, logger_error=None, logger_timeout=None, timeout_show_str=None, kill_when_timeout=True)
作为装饰器在指定时间timeout(秒)内运行函数,超时则结束运行
通过控制线程实现

Examples

 1import os
 2import pandas as pd
 3from dramkit.gentools import tmprint
 4
 5df1 = pd.DataFrame([[1, 2], [3, 4]])
 6df2 = pd.DataFrame([[5, 6], [7, 8]])
 7df1.to_excel('./_test/with_timeout_thread_test_df.xlsx')
 8TIMEOUT = 3
 9
10@with_timeout_thread(TIMEOUT,
11                     logger_error=False
12                     )
13def func(x):
14    with open('./_test/with_timeout_thread_test_df.xlsx') as f:
15        tmprint('sleeping...')
16        time.sleep(5)
17    df2.to_excel('./_test/with_timeout_thread_test_df.xlsx')
18    return x
19
20def test():
21    res = func('test')
22    print('res:', res)
23    os.remove('./_test/with_timeout_thread_test_df.xlsx')
24    return res
>>> res = test()

try_repeat_run

dramkit.gentools.try_repeat_run(n_max_run=3, logger=None, sleep_seconds=0, force_rep=False)
作为装饰器尝试多次运行指定函数
使用场景:第一次运行可能出错,需要再次运行(比如取数据时第一次可能连接超时,需要再取一次)
Parameters:
  • n_max_run (int) – 最多尝试运行次数

  • logger (None, logging.Logger) – 日志记录器

  • sleep_seconds (int, float) –

    多次执行时,上一次执行完成之后需要暂停的时间(秒)
    注:在force_rep为True时,每次执行完都会暂停,force_rep为False只有报错之后才会暂停

  • force_rep (bool) – 若为True,则不论是否报错都强制重复执行,若为False,则只有报错才会重复执行

Returns:

result – 若目标函数执行成功,则返回执行结果;若失败,则返回None

Return type:

None, other

Examples

 1from dramkit import simple_logger
 2logger = simple_logger()
 3
 4@try_repeat_run(2, logger=logger, sleep_seconds=0, force_rep=False)
 5def rand_div(x):
 6    return x / np.random.randint(-1, 1)
 7
 8def repeat_test(info_):
 9    print(info_)
10    return rand_div(0)
>>> a = repeat_test('repeat test...')
>>> print(a)

capture_print

dramkit.gentools.capture_print(logger=None, del_last_blank=True)

作为装饰器捕获函数中的print内容

Parameters:

logger (None, logging.Logger) – 日志记录器,若为None,则会优先使用被调用函数参数中的logger

Examples

 1from dramkit import simple_logger
 2logger = simple_logger()
 3
 4@capture_print(logger)
 5def test_print():
 6    print('test_print...')
 7
 8@capture_print()
 9def test_print1(logger=None):
10    print('test_print1...')
11
12@capture_print(simple_logger(logname='test'))
13def test_print2(logger=None):
14    print('test_print2...')
15
16@capture_print(False)
17def test_print3(logger=None):
18    print('test_print3...')
>>> test_print()
test_print...
    [INFO: 2023-04-29 10:45:41,671]
>>> test_print1(logger=simple_logger('./_test/capture_print_test.log'))
>>> test_print2(logger=simple_logger('./_test/capture_print_test.log'))
>>> test_print3()

log_used_time

dramkit.gentools.log_used_time(logger=None)

作为装饰器记录函数运行用时

Parameters:

logger (None, logging.Logger) – 日志记录器,若为None,则会优先使用被调用函数参数中的logger

Examples

 1from dramkit import simple_logger
 2logger = simple_logger()
 3
 4@log_used_time(logger)
 5def wait():
 6    print('wait...')
 7    time.sleep(3)
 8
 9@log_used_time()
10def wait1(logger=None):
11    print('wait...')
12    time.sleep(3)
13
14@log_used_time(simple_logger(logname='test'))
15def wait2(logger=None):
16    print('wait...')
17    time.sleep(3)
18
19@log_used_time(False)
20def wait3(logger=None):
21    print('wait...')
22    time.sleep(3)
>>> wait()
wait...
func `wait` run time: 3.0s.
    [INFO: 2023-04-20 13:39:37,292]
>>> wait1(logger=simple_logger('./_test/log_used_time_test.log'))
>>> wait2(logger=simple_logger('./_test/log_used_time_test.log'))
>>> wait3()

References

func_res_process_decorator

dramkit.gentools.func_res_process_decorator(func_res, *args_res, **kwargs_res)

作为装饰器处理函数输出结果

Examples

>>> def res_process(res, k, b):
...    return res * k + b
...
... @func_res_process_decorator(res_process, 2, 5)
... def a(x, y):
...     return x+y
>>> a(2, 3)
15

func_runtime_test

dramkit.gentools.func_runtime_test(func, n=10000, return_all=False, *args, **kwargs)

函数性能(运行时间)测试,n设置测试运行测试

check_list_arg

dramkit.gentools.check_list_arg(arg, allow_none=False)

检查arg,若其不是list或tuple,则转为列表

get_update_kwargs

dramkit.gentools.get_update_kwargs(key, arg, kwargs, arg_default=None, func_update=None)

取出并更新kwargs中key参数的值

使用场景:当一个函数接受**kwargs参数,同时需要对kwargs里面的某个key的值进行更新并且 提取出来单独传参

Parameters:
  • key – kwargs中待取出并更新的关键字

  • arg – 关键字key对应的新值

  • kwargs (dict) – 关键字参数对

  • arg_default (key关键词对应参数默认值) –

  • func_update (None, False, function) –

    自定义取出的key参数值更新函数: arg_new = func_update(arg, arg_old)

    • 若为False, 则不更新,直接取出原来key对应的参数值或默认值

    • 若为`replace`, 则直接替换更新

    • 若为None, 则 默认 更新方式为:

      • 若参数值arg为dict或list类型,则增量更新

      • 若参数值arg_old为list且arg不为list,则增量更新

      • 其它情况直接替换更新

Returns:

  • arg_new – 取出并更新之后的关键字key对应参数值

  • kwargs – 删除key之后的kwargs

Examples

>>> key, arg = 'a', 'aa'
>>> kwargs = {'a': 'a', 'b': 'b'}
>>> get_update_kwargs(key, arg, kwargs)
('aa', {'b': 'b'})
>>> key, arg = 'a', {'a_': 'aa__'}
>>> kwargs = {'a': {'a': 'aa'}, 'b': 'b'}
>>> get_update_kwargs(key, arg, kwargs)
({'a': 'aa', 'a_': 'aa__'}, {'b': 'b'})
>>> key, arg = 'a', ['aa', 'aa_']
>>> kwargs = {'a': ['a'], 'b': 'b'}
>>> get_update_kwargs(key, arg, kwargs)
(['a', 'aa', 'aa_'], {'b': 'b'})
>>> key, arg = 'a', ['aa', 'aa_']
>>> kwargs = {'a': ['a'], 'b': 'b'}
>>> get_update_kwargs(key, arg, kwargs, func_update='replace')
(['aa', 'aa_'], {'b': 'b'})
>>> key, arg = 'a', 'aa_'
>>> kwargs = {'a': ['a'], 'b': 'b'}
>>> get_update_kwargs(key, arg, kwargs)
(['a', 'aa_'], {'b': 'b'})

roulette_base

dramkit.gentools.roulette_base(fitness)

基本轮盘赌法

Parameters:

fitness (list) –

所有备选对象的fitness值列表

Note

fitness的元素值应为正,且fitness值越大,被选中概率越大

Returns:

int - 返回被选中对象的索引号

References

https://blog.csdn.net/armwangEric/article/details/50775206

roulette_stochastic_accept

dramkit.gentools.roulette_stochastic_accept(fitness)

轮盘赌法,随机接受法

Parameters:

fitness (list) –

所有备选对象的fitness值列表

Note

fitness的元素值应为正,且fitness值越大,被选中概率越大

Returns:

int - 返回被选中对象的索引号

References

https://blog.csdn.net/armwangEric/article/details/50775206

roulette_count

dramkit.gentools.roulette_count(fitness, n=10000, rand_func=None)

轮盘赌法n次模拟,返回每个备选对象在n次模拟中被选中的次数

Parameters:
  • fitness (list, dict) –

    所有备选对象的fitness值列表或字典,格式参见Example

    Note

    fitness的元素值应为正,且fitness值越大,被选中概率越大

  • n (int) – 模拟次数

  • rand_func (None, function) –

    指定轮盘赌法函数,如’roulette_base’(dramkit.gentools.roulette_base())
    或’roulette_stochastic_accept’(dramkit.gentools.roulette_stochastic_accept()),
    默认用’roulette_stochastic_accept’

Returns:

list, dict - 返回每个对象在模拟n次中被选中的次数

Examples

>>> fitness = [1, 2, 3]
>>> roulette_count(fitness, n=6000)
[(0, 991), (1, 2022), (2, 2987)]
>>> fitness = (1, 2, 3)
>>> roulette_count(fitness, n=6000)
[(0, 1003), (1, 1991), (2, 3006)]
>>> fitness = [('a', 1), ('b', 2), ('c', 3)]
>>> roulette_count(fitness, n=6000)
[('a', 997), ('b', 1989), ('c', 3014)]
>>> fitness = [['a', 1], ['b', 2], ['c', 3]]
>>> roulette_count(fitness, n=6000)
[('a', 1033), ('b', 1967), ('c', 3000)]
>>> fitness = {'a': 1, 'b': 2, 'c': 3}
>>> roulette_count(fitness, n=6000)
{'a': 988, 'b': 1971, 'c': 3041}

rand_sum

dramkit.gentools.rand_sum(target_sum, n, lowests, highests, isint=True, n_dot=6)

在指定最大最小值范围内随机选取若干个随机数,所选取数之和为定值

Parameters:
  • target_sum (int, float) – 目标和

  • n (int) – 随机选取个数

  • lowests (int, floot, list) – 随机数下限值,若为list,则其第k个元素对应第k个随机数的下限

  • highests (int, floot, list) – 随机数上限值,若为list,则其第k关元素对应第k个随机数的上限

  • isint (bool) –

    所选数是否强制为整数,若为False,则为实数

    Note

    若输入lowests或highests不是int,则isint为True无效

  • n_dot (int) – 动态上下界值与上下限比较时控制小数位数(为了避免python精度问题导致的报错)

Returns:

list - 随机选取的n个数,其和为target_sum

Examples

>>> rand_sum(100, 2, [20, 30], 100)
[65, 35]
>>> rand_sum(100, 2, 20, 100)
[41, 59]
>>> rand_sum(100, 2, [20, 10], [100, 90])
[73, 27]

rand_weight_sum

dramkit.gentools.rand_weight_sum(weight_sum, n, lowests, highests, weights=None, n_dot=6)

在指定最大最小值范围内随机选取若干个随机数,所选取数之加权和为定值

Parameters:
  • weight_sum (float) – 目标加权和

  • n (int) – 随机选取个数

  • lowests (int, floot, list) – 随机数下限值,若为list,则其第k个元素对应第k个随机数的下限

  • highests (int, floot, list) – 随机数上限值,若为list,则其第k关元素对应第k个随机数的上限

  • weights (None, list) –

    权重列表

    Note

    lowests和highests与weights应一一对应

  • n_dot (int) – 动态上下界值与上下限比较时控制小数位数(为了避免python精度问题导致的报错)

Returns:

list - 随机选取的n个数,其以weights为权重的加权和为weight_sum

Examples

>>> rand_weight_sum(60, 2, [20, 30], 100)
[21.41082008017613, 98.58917991982386]
>>> rand_weight_sum(70, 2, 20, 100)
[56.867261610484356, 83.13273838951565]
>>> rand_weight_sum(80, 2, [20, 10], [100, 90])
[80.32071140116187, 79.67928859883813]
>>> rand_weight_sum(80, 2, [20, 10], [100, 90], [0.6, 0.4])
[88.70409567475888, 66.94385648786168]
>>> rand_weight_sum(180, 2, [20, 10], [100, 90], [3, 2])
[23.080418085462018, 55.37937287180697]

replace_repeat_iter

dramkit.gentools.replace_repeat_iter(series, val, val0, gap=None, keep_last=False)

替换序列中重复出现的值

series (pd.Series) 中若步长为gap的范围内出现多个val值,则只保留第一条记录, 后面的替换为val0
若gap为None,则将连续出现的val值只保留第一个,其余替换为val0(这里连续出现val是指 不出现除了val和val0之外的其他值)
若keep_last为True,则连续的保留最后一个

返回结果为替换之后的series (pd.Series)

Examples

>>> data = pd.DataFrame([0, 1, 1, 0, -1, -1, 2, -1, 1, 0, 1, 1, 1, 0, 0,
...                      -1, -1, 0, 0, 1], columns=['test'])
>>> data['test_rep'] = replace_repeat_iter(data['test'], 1, 0, gap=None)
>>> data
    test  test_rep
0      0         0
1      1         1
2      1         0
3      0         0
4     -1        -1
5     -1        -1
6      2         2
7     -1        -1
8      1         1
9      0         0
10     1         0
11     1         0
12     1         0
13     0         0
14     0         0
15    -1        -1
16    -1        -1
17     0         0
18     0         0
19     1         1
>>> series = pd.Series([-1, 1, -1, 0, 1, 0, 1, 1, -1])
>>> replace_repeat_iter(series, 1, 0, gap=5)
0   -1
1    1
2   -1
3    0
4    1
5    0
6    0
7    0
8   -1

replace_repeat_pd

dramkit.gentools.replace_repeat_pd(series, val, val0, keep_last=False)
替换序列中重复出现的值, 仅保留第一个

函数功能,参数和意义同 dramkit.gentools.replace_repeat_iter()
区别在于计算时在pandas.DataFrame里面进行而不是采用迭代方式,同时取消了gap 参数(即连续出现的val值只保留第一个)

replace_repeat_func_iter

dramkit.gentools.replace_repeat_func_iter(series, func_val, func_val0, gap=None, keep_last=False)
替换序列中重复出现的值,功能与 dramkit.gentools.replace_repeat_iter() 类似,只不过把val和val0的值由直接指定换成了由指定函数生成
func_val 函数用于判断连续条件,其返回值只能是True或False,
func_val0 函数用于生成替换的新值。
即series中若步长为gap的范围内出现多个满足func_val函数为True的值, 则只保留第一条记录,后面的替换为函数func_val0的值。
若gap为None,则将连续出现的满足func_val函数为True的值只保留第一个,其余替换为函数 func_val0的值(这里连续出现是指不出现除了满足func_val为True和等于func_val0函数值 之外的其他值)

返回结果为替换之后的series (pd.Series)

Examples

>>> data = pd.DataFrame({'y': [0, 1, 1, 0, -1, -1, 2, -1, 1, 0, 1,
...                            1, 1, 0, 0, -1, -1, 0, 0, 1]})
>>> data['y_rep'] = replace_repeat_func_iter(
...                 data['y'], lambda x: x < 1, lambda x: 3, gap=None)
>>> data
    y  y_rep
0   0      0
1   1      1
2   1      1
3   0      0
4  -1      3
5  -1      3
6   2      2
7  -1     -1
8   1      1
9   0      0
10  1      1
11  1      1
12  1      1
13  0      0
14  0      3
15 -1      3
16 -1      3
17  0      3
18  0      3
19  1      1

replace_repeat_func_pd

dramkit.gentools.replace_repeat_func_pd(series, func_val, func_val0, keep_last=False)

替换序列中重复出现的值, 仅保留第一个

函数功能,参数和意义同 dramkit.gentools.replace_repeat_func_iter()
区别在于计算时在pandas.DataFrame里面进行而不是采用迭代方式
同时取消了gap参数(即连续出现的满足func_val为True的值只保留第一个)

con_count

dramkit.gentools.con_count(series, func_cond, via_pd=True)

计算series(pd.Series)中连续满足func_cond函数指定的条件的记录数

Parameters:
  • series (pd.Series) – 目标序列

  • func_cond (function) – 指定条件的函数,func_cond(x)返回结果只能为True或False

  • via_pd (bool) – 若via_pd为False,则计算时使用循环迭代,否则在pandas.DataFrame里面进行计算

Returns:

pd.Series - 返回连续计数结果

Examples

>>> df = pd.DataFrame([0, 0, 1, 1, 0, 0, 1, 1, 1], columns=['series'])
>>> func_cond = lambda x: True if x == 1 else False
>>> df['count1'] = con_count(df['series'], func_cond, True)
>>> df
   series  count1
0       0       0
1       0       0
2       1       1
3       1       2
4       0       0
5       0       0
6       1       1
7       1       2
8       1       3
>>> df['count0'] = con_count(df['series'], lambda x: x != 1, False)
>>> df
   series  count1  count0
0       0       0       1
1       0       0       2
2       1       1       0
3       1       2       0
4       0       0       1
5       0       0       2
6       1       1       0
7       1       2       0
8       1       3       0

con_count_ignore

dramkit.gentools.con_count_ignore(series, func_cond, via_pd=True, func_ignore=None)

dramkit.gentools.con_count() 的基础上增加连续性判断条件:

当series中的值满足func_ignore函数值为True时,不影响连续性判断(func_ignore 默认为 lambda x: isnull(x))

get_preval_func_cond

dramkit.gentools.get_preval_func_cond(data, col_val, col_cond, func_cond)
获取上一个满足指定条件的行中col_val列的值,条件为:
该行中col_cond列的值x满足func_cond(x)为True (func_cond(x)返回结果只能为True或False)
返回结果为 pd.Series

Examples

>>> data = pd.DataFrame({'x1': [0, 1, 1, 0, -1, -1, 2, -1, 1, 0, 1, 1, 1,
...                             0, 0, -1, -1, 0, 0, 1],
...                      'x2': [0, 1, 1, 0, -1, -1, 1, -1, 1, 0, 1, 1, 1,
...                             0, 0, -1, -1, 0, 0, 1]})
>>> data['x1_pre'] = get_preval_func_cond(data, 'x1', 'x2', lambda x: x != 1)
>>> data
    x1  x2  x1_pre
0    0   0     NaN
1    1   1     0.0
2    1   1     0.0
3    0   0     0.0
4   -1  -1     0.0
5   -1  -1    -1.0
6    2   1    -1.0
7   -1  -1    -1.0
8    1   1    -1.0
9    0   0    -1.0
10   1   1     0.0
11   1   1     0.0
12   1   1     0.0
13   0   0     0.0
14   0   0     0.0
15  -1  -1     0.0
16  -1  -1    -1.0
17   0   0    -1.0
18   0   0     0.0
19   1   1     0.0

gap_count

dramkit.gentools.gap_count(series, func_cond, via_pd=True)

计算series (pd.Series)中当前行距离上一个满足 func_cond 函数指定条件记录的行数

func_cond为指定条件的函数,func_cond(x)返回结果只能为True或False, 若via_pd为False,则使用循环迭代,若via_pd为True,则在pandas.DataFrme内计算 返回结果为 pd.Series

Examples

>>> df = pd.DataFrame([0, 1, 1, 0, 0, 1, 1, 1], columns=['series'])
>>> func_cond = lambda x: True if x == 1 else False
>>> df['gap1'] = gap_count(df['series'], func_cond, True)
>>> df
   series  gap1
0       0     0
1       1     0
2       1     1
3       0     1
4       0     2
5       1     3
6       1     1
7       1     1
>>> df['gap0'] = gap_count(df['series'], lambda x: x != 1, False)
>>> df
   series  gap1  gap0
0       0     0     0
1       1     0     1
2       1     1     2
3       0     1     3
4       0     2     1
5       1     3     1
6       1     1     2
7       1     1     3

count_between_gap

dramkit.gentools.count_between_gap(data, col_gap, col_count, func_gap, func_count, count_now_gap=False, count_now=True, via_pd=True)

计算data (pandas.DataFrame)中当前行与上一个满足 func_gap 函数为True的行之间, 满足 func_count 函数为True的记录数

函数func_gap作用于 col_gap 列,func_count作用于 col_count 列, 两者返回值均为True或False
count_now_gap 设置满足func_gap的行是否参与计数,若为False, 则该行计数为0,若为True,则该行按照上一次计数的最后一次计数处理

Todo

增加count_now_gap的处理方式:

  • 该行计数为0

  • 该行按上一次计数的最后一次计数处理

  • 该行按下一次计数的第一次计数处理

count_now 设置当当前行满足func_count时,从当前行开始对其计数还是从下一行开始对其计数

Note

注:当前行若满足同时满足func_gap和func_count,对其计数的行不会为下一行 (即要么不计数,要么在当前行对其计数)

若via_pd为True,则调用 count_between_gap_pd() 实现,否则用 count_between_gap_iter()

返回结果为 pd.Series

Examples

>>> data = pd.DataFrame({'to_gap': [0, 1, 1, 0, -1, -1, 2, -1, 1, 0, -1, 1,
...                                 1, 0, 0, -1, -1, 0, 0, 1],
...                      'to_count': [0, 1, 1, 0, -1, -1, 1, -1, 1, 0, 1,
...                                   1, 1, 0, 0, -1, -1, 0, 0, 1]})
>>> data['gap_count'] = count_between_gap(data, 'to_gap', 'to_count',
...                                       lambda x: x == -1, lambda x: x == 1,
...                                       count_now_gap=False, count_now=False)
>>> data
        to_gap  to_count  gap_count
0        0         0          0
1        1         1          0
2        1         1          0
3        0         0          0
4       -1        -1          0
5       -1        -1          0
6        2         1          0
7       -1        -1          0
8        1         1          0
9        0         0          1
10      -1         1          0
11       1         1          0
12       1         1          1
13       0         0          2
14       0         0          2
15      -1        -1          0
16      -1        -1          0
17       0         0          0
18       0         0          0
19       1         1          0
>>> data = pd.DataFrame({'to_gap': [0, 1, 1, 0, -1, -1, 2, -1, 1, 0, -1, 1,
                                    1, 0, 0, -1, -1, 0, 0, 1, -1, -1],
                         'to_count': [0, 1, 1, 0, -1, -1, 1, -1, 1, 0, 1,
                                      1, 1, 0, 0, -1, 1, 0, 1, 1, 1, 1]})
>>> data['gap_count'] = count_between_gap(data, 'to_gap', 'to_count',
                                          lambda x: x == -1, lambda x: x == 1,
                                          count_now_gap=False, count_now=True)
>>> data
        to_gap  to_count  gap_count
0        0         0          0
1        1         1          0
2        1         1          0
3        0         0          0
4       -1        -1          0
5       -1        -1          0
6        2         1          1
7       -1        -1          0
8        1         1          1
9        0         0          1
10      -1         1          0
11       1         1          1
12       1         1          2
13       0         0          2
14       0         0          2
15      -1        -1          0
16      -1         1          0
17       0         0          0
18       0         1          1
19       1         1          2
20      -1         1          0
21      -1         1          0
>>> data = pd.DataFrame({'to_gap': [0, 1, 1, 0, -1, -1, 2, -1, 1, 0, -1, 1,
                                    1, 0, 0, -1, -1, 0, 0, 1, -1, -1],
                         'to_count': [0, -1, -1, 0, -1, -1, 1, -1, 1, 0, 1, 1,
                                      1, 0, 0, -1, -1, 0, -1, 1, 1, 1]})
>>> data['gap_count'] = count_between_gap(data, 'to_gap', 'to_count',
                                          lambda x: x == -1, lambda x: x == 1,
                                          count_now_gap=True, count_now=False)
>>> data
        to_gap  to_count  gap_count
0        0         0          0
1        1        -1          0
2        1        -1          0
3        0         0          0
4       -1        -1          0
5       -1        -1          0
6        2         1          0
7       -1        -1          1
8        1         1          0
9        0         0          1
10      -1         1          1
11       1         1          0
12       1         1          1
13       0         0          2
14       0         0          2
15      -1        -1          2
16      -1        -1          0
17       0         0          0
18       0        -1          0
19       1         1          0
20      -1         1          1
21      -1         1          0
>>> data = pd.DataFrame({'to_gap': [0, 1, 1, 0, -1, -1, 2, -1, 1, 0, -1, 1,
                                    1, 0, 0, -1, -1, 0, 0, 1, -1, -1],
                         'to_count': [0, -1, -1, 0, -1, -1, 1, -1, 1, 0, 1, 1,
                                      1, 0, 0, -1, -1, 0, -1, 1, 1, 1]})
>>> data['gap_count'] = count_between_gap(data, 'to_gap', 'to_count',
                                          lambda x: x == -1, lambda x: x == 1,
                                          count_now_gap=True, count_now=True)
>>> data
        to_gap  to_count  gap_count
0        0         0          0
1        1        -1          0
2        1        -1          0
3        0         0          0
4       -1        -1          0
5       -1        -1          0
6        2         1          1
7       -1        -1          1
8        1         1          1
9        0         0          1
10      -1         1          2
11       1         1          1
12       1         1          2
13       0         0          2
14       0         0          2
15      -1        -1          2
16      -1        -1          0
17       0         0          0
18       0        -1          0
19       1         1          1
20      -1         1          2
21      -1         1          1

count_between_gap_pd

dramkit.gentools.count_between_gap_pd(data, col_gap, col_count, func_gap, func_count, count_now_gap=True, count_now=True)

参数和功能说明见 dramkit.gentools.count_between_gap() 函数

count_between_gap_iter

dramkit.gentools.count_between_gap_iter(data, col_gap, col_count, func_gap, func_count, count_now_gap=True, count_now=True)

参数和功能说明见 dramkit.gentools.count_between_gap() 函数

val_gap_cond

dramkit.gentools.val_gap_cond(data, col_val, col_cond, func_cond, func_val, to_cal_col=None, func_to_cal=None, val_nan=nan, contain_1st=False)

计算data (pandas.DataFrame)中从上一个 col_cond 列满足 func_cond 函数的行 到当前行, col_val 列记录的 func_val 函数值

func_cond作用于col_cond列,func_cond(x)返回True或False,x为单个值
func_val函数作用于col_val列,func_val(x)返回单个值,x为np.array或pd.Series或列表等
func_to_cal作用于to_cal_col列,只有当前行func_to_cal值为True时才进行func_val计算, 否则返回结果中当前行值设置为val_nan
contain_1st设置func_val函数计算时是否将上一个满足func_cond的行也纳入计算

Todo

参考 dramkit.gentools.count_between_gap() 的设置:

  • 设置col_cond列满足func_cond函数的行,其参与func_val函数的前一次计算还是下一次计算还是不参与计算

Examples

>>> data = pd.DataFrame({'val': [1, 2, 5, 3, 1, 7 ,9],
...                      'sig': [1, 1, -1, 1, 1, -1, 1]})
>>> data['val_pre1'] = val_gap_cond(data, 'val', 'sig',
...                    lambda x: x == -1, lambda x: max(x))
>>> data
   val  sig  val_pre1
0    1    1       NaN
1    2    1       NaN
2    5   -1       NaN
3    3    1       3.0
4    1    1       3.0
5    7   -1       7.0
6    9    1       9.0

filter_by_func_prenext

dramkit.gentools.filter_by_func_prenext(l, func_prenext)

l (list)进行过滤,过滤后返回的 lnew (list)任意前后相邻两个元素满足:

func_prenext(lnew[i], lnew[i+1]) = True

过滤过程为:将 l 的第一个元素作为起点,找到其后第一个满足 func_prenext 函数 值为True的元素,再以该元素为起点往后寻找…

Examples

>>> l = [1, 2, 3, 4, 1, 1, 2, 3, 6]
>>> func_prenext = lambda x, y: (y-x) >= 2
>>> filter_by_func_prenext(l, func_prenext)
[1, 3, 6]
>>> l = [1, 2, 3, 4, 1, 5, 1, 2, 3, 6]
>>> filter_by_func_prenext(l, func_prenext)
[1, 3, 5]
>>> filter_by_func_prenext(l, lambda x, y: y == x+1)
[1, 2, 3, 4]
>>> l = [(1, 2), (2, 3), (4, 1), (5, 0)]
>>> func_prenext = lambda x, y: abs(y[-1]-x[-1]) == 1
>>> filter_by_func_prenext(l, func_prenext)
[(1, 2), (2, 3)]

filter_by_func_prenext_series

dramkit.gentools.filter_by_func_prenext_series(series, func_prenext, func_ignore=None, val_nan=nan)

对series (pandas.Series)调用 filter_by_func_prenext 函数进行过滤, 其中满足 func_ignore 函数为True的值不参与过滤,func_ignore函数默认为: lambda x: isnull(x)

series中 被过滤的值 在返回结果中用 val_nan 替换, 不参与过滤 的值保持不变

Examples

>>> series = pd.Series([1, 2, 3, 4, 1, 1, 2, 3, 6])
>>> func_prenext = lambda x, y: (y-x) >= 2
>>> filter_by_func_prenext_series(series, func_prenext)
0    1.0
1    NaN
2    3.0
3    NaN
4    NaN
5    NaN
6    NaN
7    NaN
8    6.0
>>> series = pd.Series([1, 2, 0, 3, 0, 4, 0, 1, 0, 0, 1, 2, 3, 6],
...                    index=range(14, 0, -1))
>>> filter_by_func_prenext_series(series, func_prenext, lambda x: x == 0)
14    1.0
13    NaN
12    0.0
11    3.0
10    0.0
9     NaN
8     0.0
7     NaN
6     0.0
5     0.0
4     NaN
3     NaN
2     NaN
1     6.0

df_na2value

dramkit.gentools.df_na2value(df, value=None)

copy_df_structure

dramkit.gentools.copy_df_structure(df)

复制df的结构到一个空的dataframe

get_tmp_new

dramkit.gentools.get_tmp_new(exists, tmp_new, ext='_')

以tmp_new为基础生成一个不在exists的值

get_tmp_col

dramkit.gentools.get_tmp_col(df, tmp_col_name)

以tmp_col_name为基础生成一个不在df的列名中的列

merge_df

dramkit.gentools.merge_df(df_left, df_right, same_keep='left', **kwargs)

pd.merge 上改进,相同列名时自动去除重复的

Parameters:
  • df_left (pandas.DataFrame) – 待merge左表

  • df_right (pandas.DataFrame) – 待merge右表

  • same_keep (str) – 可选’left’, ‘right’,设置相同列保留左边df还是右边df

  • **kwargs – pd.merge接受的其他参数

Returns:

pandas.DataFrame - 返回merge之后的数据表

update_df

dramkit.gentools.update_df(df_old, df_new, idcols=None, del_dup_cols=None, rep_keep='new', sort_cols=None, ascendings=True, method='merge', logger=None)
合并df_new到df_old
注意:df_old和df_new不应该设置index
注意:用merge处理当数据量大时占空间很大,method应设置为`concat`

Todo

增加对各列的类型进行指定的参数设置

Examples

>>> df_old = pd.DataFrame({'id1': [1, 2, 3, 4, 5],
...                        'id2': [2, 3, 4, 5, 6],
...                        'col1': ['a', 'b', 'c', 'd', 'e'],
...                        'col2': [2, 4, 6, 8, 10]})
>>> df_new = pd.DataFrame({'id1': [3, 4, 5, 6, 7],
...                        'id2': [4, 5, 6, 7, 8],
...                        'col1': ['c', 'ddd', np.nan, 'f', 'g'],
...                        'col3': [6, 8, 10, 12, 14]})
... idcols = ['id1', 'id2']
... rep_keep = 'new'
... del_dup_cols = None#['id1', 'id2']
>>> a = update_df(df_old, df_new,
...               idcols=idcols,
...               del_dup_cols=del_dup_cols,
...               rep_keep=rep_keep,
...               method='merge')
>>> b = update_df(df_old, df_new,
...               idcols=idcols,
...               del_dup_cols=del_dup_cols,
...               rep_keep=rep_keep,
...               method='concat')

dfs_concat_axis1

dramkit.gentools.dfs_concat_axis1(dfs_list, idcols=None, ascending=True)

多个df列表横向拼接

cut_df_by_con_val

dramkit.gentools.cut_df_by_con_val(df, by_col, func_eq=None)

根据 by_col 列的值,将 df (pandas.DataFrame) 切分为多个子集列表,返回 list

切分依据:func_eq 函数作用于 by_col 列,函数值连续相等的记录被划分到一个子集中

Examples

>>> df = pd.DataFrame({'val': range(0,10),
...                    'by_col': ['a']*3+['b']*2+['c']*1+['a']*3+['d']*1})
>>> df.index = ['z', 'y', 'x', 'w', 'v', 'u', 't', 's', 'r', 'q']
>>> cut_df_by_con_val(df, 'by_col')
[   val by_col
 z    0      a
 y    1      a
 x    2      a,
    val by_col
 w    3      b
 v    4      b,
    val by_col
 u    5      c,
    val by_col
 t    6      a
 s    7      a
 r    8      a,
    val by_col
 q    9      d]

get_con_start_end

dramkit.gentools.get_con_start_end(series, func_con)

找出series (pandas.Series)中值连续满足 func_con 函数值为True的分段起止位置, 返回起止位置对列表

Examples

>>> series = pd.Series([0, 1, 1, 0, 1, 1, 0, -1, -1, 0, 0, -1, 1, 1, 1, 1, 0, -1])
>>> start_ends = get_con_start_end(series, lambda x: x == -1)
>>> start_ends
[[7, 8], [11, 11], [17, 17]]
>>> start_ends = get_con_start_end(series, lambda x: x == 1)
>>> start_ends
[[1, 2], [4, 5], [12, 15]]

cut_range_to_subs

dramkit.gentools.cut_range_to_subs(n, gap)

range(0, n) 切分成连续相接的子集: [range(0, gap), range(gap, 2*gap), ...]

cut_to_subs

dramkit.gentools.cut_to_subs(l, gap)

将列表分段,gap指定每段的个数

check_l_allin_l0

dramkit.gentools.check_l_allin_l0(l, l0)

判断 l (list) 中的值是否都是 l0 (list) 中的元素, 返回True或False

Examples

>>> l = [1, 2, 3, -1, 0]
>>> l0 = [0, 1, -1]
>>> check_l_allin_l0(l, l0)
False
>>> l = [1, 1, 0, -1, -1, 0, 0]
>>> l0 = [0, 1, -1]
>>> check_l_in_l0(l, l0)
True

check_exist_data

dramkit.gentools.check_exist_data(df, x_list, cols=None)

依据指定的 cols 列检查 df (pandas.DataFrame) 中是否已经存在 x_list (list) 中的记录, 返回list,每个元素值为True或False

Examples

>>> df = pd.DataFrame([['1', 2, 3.1, ], ['3', 4, 5.1], ['5', 6, 7.1]],
...                   columns=['a', 'b', 'c'])
>>> x_list, cols = [[3, 4], ['3', 4]], ['a', 'b']
>>> check_exist_data(df, x_list, cols=cols)
[False, True]
>>> check_exist_data(df, [['1', 3.1], ['3', 5.1]], ['a', 'c'])
[True, True]

isnull

dramkit.gentools.isnull(x)

判断x是否为无效值(None, nan, x != x),若是无效值,返回True,否则返回False

x_div_y

dramkit.gentools.x_div_y(x, y, v_x0=None, v_y0=0, v_xy0=1)

x除以y

  • v_xy0为当x和y同时为0时的返回值

  • v_y0为当y等于0时的返回值

  • v_x0为当x等于0时的返回值

power

dramkit.gentools.power(a, b, return_real=True)

计算a的b次方,return_real设置是否只返回实属部分

log

dramkit.gentools.log(x, bottom=None)

计算对数, bottom指定底

cal_pct

dramkit.gentools.cal_pct(v0, v1, vv00=1, vv10=- 1)

计算从v0到v1的百分比变化

  • vv00为当v0的值为0且v1为正时的返回值,v1为负时取负号

  • vv10为当v1的值为0且v0为正时的返回值,v0为负时取负号

Todo

正负无穷大处理

pct_change

dramkit.gentools.pct_change(series, lag=1)

计算series百分比变化

Todo

分子分母为0或正负无穷大处理

s1divs2

dramkit.gentools.s1divs2(series1, series2, vs20=nan)

两个序列相比

min_com_multer

dramkit.gentools.min_com_multer(l)

求一列数 l (list) 的最小公倍数,支持负数和浮点数

max_com_divisor

dramkit.gentools.max_com_divisor(l)

求一列数 l (list) 的最大公约数,只支持正整数

Note

只支持正整数

mcd2_tad

dramkit.gentools.mcd2_tad(a, b)

辗转相除法求a和b的最大公约数,a、b为正数

Note

  • a, b应为正数

  • a, b为小数时由于精度问题会不正确

max_com_divisor_tad

dramkit.gentools.max_com_divisor_tad(l)

用辗转相除法求一列数 l (list) 的最大公约数, l 元素均为正数

Note

  • l元素均为正数

  • l元素为小数时由于精度问题会不正确

References

https://blog.csdn.net/weixin_45069761/article/details/107954905

df_rows2cols

dramkit.gentools.df_rows2cols(df, col_name, col_val_name, fill_value=None)

把df中col_value_name列数据按col_name列分组,通过unstack拆分为多列数据,新列名为col_name列中的值

Todo

有nan时的检查和处理

Examples

>>> df0 = pd.DataFrame(
...         [['Saliy', 'midterm', 'class1', 'A'],
...         ['Saliy', 'midterm', 'class3', 'B'],
...         ['Saliy', 'final', 'class1', 'C'],
...         ['Saliy', 'final', 'class3', 'C'],
...         ['Jeff', 'midterm', 'class2', 'D'],
...         ['Jeff', 'midterm', 'class4', 'A'],
...         ['Jeff', 'final', 'class2', 'E'],
...         ['Jeff', 'final', 'class4', 'C'],
...         ['Roger', 'midterm', 'class2', 'C'],
...         ['Roger', 'midterm', 'class5', 'B'],
...         ['Roger', 'final', 'class2', 'A'],
...         ['Roger', 'final', 'class5', 'A'],
...         ['Karen', 'midterm', 'class3', 'C'],
...         ['Karen', 'midterm', 'class4', 'A'],
...         ['Karen', 'final', 'class3', 'C'],
...         ['Karen', 'final', 'class4', 'A'],
...         ['Brain', 'midterm', 'class1', 'B'],
...         ['Brain', 'midterm', 'class5', 'A'],
...         ['Brain', 'final', 'class1', 'B'],
...         ['Brain', 'final', 'class5', 'C']],
...         columns=['name', 'test', 'class', 'grade'])
>>> col_name, col_val_name = 'class', 'grade'
>>> df1 = df_rows2cols(df0, col_name, col_val_name)
>>> df2 = df_rows2cols(df0, col_name, col_val_name,
                       fill_value='None')

df_cols2rows

dramkit.gentools.df_cols2rows(df, cols, col_name, col_val_name, dropna=True)

把df中指定的cols列数据通过stack堆积为行数据,新列名为col_name

Todo

有nan时的检查和处理

Examples

>>> na = [np.nan]
>>> df3 = pd.DataFrame({'name': ['Saliy', 'Saliy', 'Jeff', 'Jeff',
...                              'Roger', 'Roger', 'Karen', 'Karen',
...                              'Brain', 'Brain'],
...                     'test': ['midterm', 'final'] * 5,
...                     'class1': ['A', 'C'] + na*6 + ['B', 'B'],
...                     'class2': na*2 + ['D', 'E', 'C', 'A'] + na*4,
...                     'class3': ['B', 'C'] + na*4 + ['C', 'C'] + na*2,
...                     'class4': na*2 + ['A', 'C'] + na*2 + ['A', 'A'] + na*2,
...                     'class5': na*4 + ['B', 'A'] + na*2 + ['A', 'C']})
>>> cols = ['class%s'%x for x in range(1, 6)]
>>> col_name = 'class'
>>> col_val_name = 'grade'
>>> df4 = df_cols2rows(df3, cols, col_name, col_val_name)
>>> df5 = df_cols2rows(df3, cols, col_name, col_val_name,
...                    dropna=False)

get_full_df

dramkit.gentools.get_full_df(df: DataFrame, full_col: str, fulls: Iterable, idcols: Optional[Union[str, list[str]]] = None, vcols: Optional[Union[str, list[str]]] = None, val_nan: Optional[Any] = None, fill: Optional[FillMethodType] = 'ffill', final_dropna: bool = True, only_fulls: bool = True, method: FullMethodType = 'pivot')

df中full_col列的值扩充到fulls

Example

>>> df = pd.DataFrame({'t': [2, 3, 5, 7, 9],
...                    'a': ['x1', 'x1', 'x3', 'x4', 'x4'],
...                    'b': ['y1', 'y2', 'y3', 'y4', 'y4'],
...                    'c': ['z1', 'z1', 'z5', 'z7', 'z9']})
>>> get_full_df(df, 't', range(1, 11))
>>> get_full_df(df, 't', range(1, 11), idcols='c', method='product')
>>> get_full_df(df, 't', range(1, 11), idcols=['a', 'b'], method='product')
>>> get_full_df(df, 't', range(1, 11), idcols=['a', 'b'])
>>> df['c'] = ['z1', 'z1', np.nan, np.nan, 'z9']
>>> get_full_df(df, 't', range(1, 11), idcols=['a', 'b'], val_nan='npnan')
>>> get_full_df(df, 't', [2, 5, 9, 10], idcols=['a', 'b'], val_nan='npnan', final_dropna=False)
>>> get_full_df(df, 't', [2, 5, 9, 10], idcols=['a', 'b'], val_nan='npnan', final_dropna=True, method='product')

get_full_components_df

dramkit.gentools.get_full_components_df(df: DataFrame, parent_col: str, child_col: str, tincol: str, toutcol: str, tfulls: Iterable, toutnan: DateTimeType, tcol_res: str = 'time', keep_inout: bool = False)
根据纳入、退出日期获取在给定时间内的所有成分

Example

>>> df = pd.DataFrame(
...      {'idxname': ['a', 'a', 'b', 'b', 'a', 'c', 'c'],
...       'stock': ['a1', 'a2', 'b1', 'b2', 'a3', 'c1', 'c2'],
...       'indate': ['20210101', '20210515', '20210405', '20210206',
...                  '20220307', '20220910', '20230409'],
...       'outdate': ['20220305', np.nan, np.nan, '20230209',
...                   np.nan, np.nan, '20230518']})
>>> parent_col, child_col, tincol, toutcol = 'idxname', 'stock', 'indate', 'outdate'
>>> from finfactory.fintools.utils_chn import get_dates_conds
>>> tfulls = get_dates_conds('quarter_end', '20210101')
>>> tfulls = [x.strftime('%Y%m%d') for x in tfulls]
>>> toutnan = '20991231'
>>> df1 = get_full_components_df(
...       df, parent_col, child_col, tincol, toutcol,
...       tfulls, toutnan, tcol_res='time')

get_first_appear

dramkit.gentools.get_first_appear(series, func_cond, reverse=False, return_iloc=False)
获取满足func_cond(x)为True的值在series中第一次出现时的索引
若reverse为True,则是最后一次出现
若return_iloc为True,则返回行号和值,否则返回索引和值

get_appear_order

dramkit.gentools.get_appear_order(series, ascending=True)

标注series (pandas.Series , 离散值)中重复元素是第几次出现,

返回为 pandas.Series,ascending设置返回结果是否按出现次序升序排列

Examples

>>> df = pd.DataFrame({'v': ['A', 'B', 'A', 'A', 'C', 'C']})
>>> df.index = ['a', 'b', 'c', 'd', 'e', 'f']
>>> df['nth'] = get_appear_order(df['v'], ascending=False)
>>> df
   v  nth
a  A    3
b  B    1
c  A    2
d  A    1
e  C    2
f  C    1

label_rep_index_str

dramkit.gentools.label_rep_index_str(df)

df (pandas.DataFrame) 中的index若有重复,对重复的index进行后缀编号,返回新的 pandas.DataFrame

Note

若存在重复的index,则添加后缀编号之后返回的df,其index为str类型

Examples

>>> df = pd.DataFrame([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
>>> label_rep_index_str(df)
    0
0   1
1   2
2   3
3   4
4   5
5   6
6   7
7   8
8   9
9  10
>>> df.index = [0, 0, 0, 1, 1, 2, 2, 2, 2, 3]
>>> label_rep_index_str(df)
      0
0     1
0_2   2
0_3   3
1     4
1_2   5
2     6
2_2   7
2_3   8
2_4   9
3    10

drop_index_duplicates

dramkit.gentools.drop_index_duplicates(df, keep='first')

删除 df (pandas.DataFrame) 中index重复的记录

count_values

dramkit.gentools.count_values(df, cols=None)

计算df列中cols指定列的值出现次数

count_index

dramkit.gentools.count_index(df)

计算df的每个index出现的次数

get_all_turns

dramkit.gentools.get_all_turns(series)

获取序列series中所有的转折点,返回series中-1位低点,1位高点

group_shift

dramkit.gentools.group_shift()

分组shift,待实现

rolling_corr

dramkit.gentools.rolling_corr(df, col1, col2, *args, **kwargs)

滚动相关性(2个变量)

group_fillna

dramkit.gentools.group_fillna(df, col_fill, cols_groupby, return_all=False, **kwargs_fillna)

分组缺失值填充

group_rank

dramkit.gentools.group_rank(df, col_rank, cols_groupby, return_all=False, **kwargs_rank)

分组排序

对df(pandas.DataFrame)中 cols_rank 指定列按 cols_groupby 指定列分组排序

Todo

col_rank可为列表,此时返回dataframe,return_all设置只返回指定列结果或者返回全部dataframe

Parameters:
  • df (pandas.DataFrame) – 待排序数据表

  • col_rank (str) – 需要排序的列

  • cols_groupby (str, list) – 分组依据列

  • **kwargs_rank – pandas中rank函数接受的参数

Returns:

series_rank – 排序结果

Return type:

pandas.Series

get_func_df_concat

dramkit.gentools.get_func_df_concat(func, args_kwargs_list, concat_axis=0)
func(args, **kwargs)作用于args_kwargs_list中的参数,将所有结果concat返回
func函数返回值必须为pd.DataFrame

Examples

>>> func = lambda x, y, z: pd.DataFrame({'a': [x, y, z]})
>>> args_kwargs_list = [[(1, 2), {'z': 3}],
...                     [(2, 3), {'z': 4}]]
>>> get_func_df_concat(func, args_kwargs_list)
   a
0  1
1  2
2  3
0  2
1  3
2  4

df_groupby_func

dramkit.gentools.df_groupby_func(df, by_cols, func, as_index=False, group_keys=False, reset_index='drop', *args_func, **kwargs_func)

Todo

  • by_cols中有nan时的检查和处理

  • group_keys和as_index的异同检查确认

df按by_cols分组作用于func(x, *args, **kwargs)函数
reset_index:
若为drop,则重置返回的index
若为False,则不处理groupby之后的index
若为ori,如返回数据与原始df行数相同,则使用原始index,否则不处理groupby之后的index
解决groupby.apply出现执行多次的问题(貌似pandas更新到1.5.0之后没出现了),参考:

bootstrapping

dramkit.gentools.bootstrapping()

bootstraping, 待实现

groupby_rolling_func

dramkit.gentools.groupby_rolling_func(data, cols_groupby, cols_val, func, keep_index=True, kwargs_rolling={}, kwargs_func={})

data按照cols_groupby分组,然后在cols_val列上rolling调用func, func的参数通过kwargs_func设定, 若cols_val为str,则返回pd.Series;若为list,则返回pd.DataFrame 若keep_index为True,则返回结果中的index与data一样,否则返回结果中的index由 cols_groupby设定

merge_dicts

dramkit.gentools.merge_dicts(dicts)

将多个字典合并成一个字典

get_lists_inter

dramkit.gentools.get_lists_inter(lists)

获取多个列表的交集

list_eq

dramkit.gentools.list_eq(l1, l2, order=True)

判断两个列表l1和l2是否相等,若orde为False,则只要元素相同即判断为相等

get_num_decimal

dramkit.gentools.get_num_decimal(x, ignore_tail0=True)
获取浮点数x的小数位数
ignore_tail0设置是否忽略小数点尾部无效的0

list2dict

dramkit.gentools.list2dict(list_key, list_val)

两个列表生成字典

sort_dict

dramkit.gentools.sort_dict(d, by='key', reverse=False)

对字典排序,by设置依据’key’还是’value’排,reverse同sorted函数参数

insert_into_list

dramkit.gentools.insert_into_list(l, val, loc_val, dirt='before')

在列表l中的loc_val前或后插入val

count_list

dramkit.gentools.count_list(l)

对l中的元素计数,返回dict

tmprint

dramkit.gentools.tmprint(*args, **kwargs)

url2chn

dramkit.gentools.url2chn(url)

将url中的中文乱码(实际上为utf-8)转化为正常url

replace_con_blank

dramkit.gentools.replace_con_blank(string, to)

将string中连续空格替换为to指定字符

change_dict_key

dramkit.gentools.change_dict_key(d, func_key_map)

修改字典d中的key值,即d[k]=v变成d[func_key_map(k)]=v

get_func_arg_info

dramkit.gentools.get_func_arg_info(func)

获取函数func的参数信息

Examples

>>> def func(a:int, b, c:float=1.0, c1=2, *args,
...          e, d=2, **kwargs):
...     pass
>>> get_func_arg_info(func)
{'args': ['a', 'b', 'c', 'c1'],
 'argdefaults': {'c': 1.0, 'c1': 2},
 'varargs': 'args',
 'varkw': 'kwargs',
 'kwonlyargs': ['e', 'd'],
 'kwonlydefaults': {'d': 2},
 'annotations': {'a': int, 'c': float}}

parse_args

dramkit.gentools.parse_args(args_info, description=None)

命令行参数解析

Parameters:

args_info (list, dict) –

参数信息
若无分组,格式为:
[([‘-a’, ‘–arg1’, …], {‘type’: int, ‘default’: 1, …}),
([‘-b’, …], {…}),
…]
若有分组,格式为:
{‘groupname1’:
[([‘-a’, ‘–arg1’, …], {‘type’: int, ‘default’: 1, …}),
([‘-b’, …], {…}),
…],
’groupname2’: […], …}

gen_args_praser

dramkit.gentools.gen_args_praser(func, native_eval_types=('list', 'tuple', 'dict'))

生成func函数的参数解析

get_topn_names

dramkit.gentools.get_topn_names(df, n=1, max_or_min='max', col_or_row='col')

获取df中值最大/小的前n列/行的名称

Examples

>>> d = {'A': [30, 2, 6, 4, 5],
...      'B': [6, 7, 80, 5, 10],
...      'C': [11, 12, 13, 14, 15],
...      'D': [16, 17, 18, 19, 20],
...      'E': [21, 22, 23, 24, 25]}
>>> df = pd.DataFrame(d, index=['a', 'b', 'c', 'd', 'e'])
>>> res = get_topn_names(df, n=2)
>>> res1 = get_topn_names(df, n=2, col_or_row='row')

install_check

install_check

dramkit.install_check.install_check()

检查是否成功安装dramkit

若成功安装,会打印版本号和相关提示信息

iotools

GetInputTimeoutError

class dramkit.iotools.GetInputTimeoutError

Bases: Exception

get_input_timeout_win

dramkit.iotools.get_input_timeout_win(timeout=10, hint_str=None)
windows下设置在等待时间内获取input

get_input_timeout_notwin

dramkit.iotools.get_input_timeout_notwin(timeout=10, hint_str=None)
非windows下设置在等待时间内获取input

get_input_timeout

dramkit.iotools.get_input_timeout(timeout=10, hint_str=None, logger=None)

get_input_with_timeout

dramkit.iotools.get_input_with_timeout(timeout=10, hint_str=None, logger=None)
通过input函数获取输入
若等待时间超过timeout秒没接收到输入,则返回None
hint_str为input函数提示信息

get_input_multi_line

dramkit.iotools.get_input_multi_line(end_word='end', end_char='', hint_str=None, hint_lineno=True)
input读取多行输入(包括回车换行符也会被保留)
end_word设置结束词,当在一行里面输入完整结束词,则表示输入完毕
注:若end_word设置为’’,则空行输入回车时会结束输入, 此时返回结果中上下行之间回车换行符不会被保留
end_char设置结束字符串,当在一行里面任意位置输入end_char,则表示输入完毕, 此时返回在end_char之前输入的内容(不包括end_char和end_char之后的内容)
hint_str设置input函数中的输入提示
若hint_lineno为True,则input函数输入时提示当前是第几行

pickle_file

dramkit.iotools.pickle_file(data, file)

以二进制格式保存数据data到文件file

Parameters:
  • data – 待保存内容

  • file (str) – 保存路径

unpickle_file

dramkit.iotools.unpickle_file(file)

读取二进制格式文件file

Parameters:

file (str) – 待读取文件路径

load_yml

dramkit.iotools.load_yml(fpath, **kwargs_open)

write_yml

dramkit.iotools.write_yml()

写入yml文件

load_json

dramkit.iotools.load_json(fpath, encoding=None, logger=None)

读取json格式文件

Parameters:
  • fpath (str) – 待读取文件路径

  • encoding (str, None) – 文件编码格式,若不指定,则尝试用utf-8和gbk编码读取

  • logger (logging.Logger) – 日志记录器

Returns:

dict - 返回读取数据

write_json

dramkit.iotools.write_json(data, fpath, encoding=None, mode='w', **kwargs)

把data写入json格式文件

Parameters:
  • data (dict) – 待写入数据

  • fpath (str) – 文件保存路径

  • encoding (str, None) – 文件编码格式

read_lines

dramkit.iotools.read_lines(fpath, encoding=None, split=True, logger=None, **kwargs_open)

读取文本文件中的所有行

Parameters:
  • fpath (str) – 待读取文件路径

  • encoding (str, None) – 文件编码格式,若不指定,则尝试用utf-8和gbk编码读取

  • logger (None, logging.Logger) – 日志记录器

Returns:

list - 文本文件中每行内容列表

write_txt

dramkit.iotools.write_txt(lines, file, mode='w', check_end=True, **kwargs)

将列表lines中的内容写入文本文件中

Parameters:
  • lines (list) – 列表,每个元素为一行文本内容

  • file (str) – 保存路径

  • mode (str) – 写入模式,如’w’或’a’

  • check_end (bool) – 是否检查每行行尾换行符,默认若行尾已经有换行符,则不再新添加换行符

  • **kwargsopen 函数接受的关键字,如encoding等

Note

不同系统文本文件每行默认结尾字符不同:

  • linux下一般以`\n`结尾

  • windows下一般以`\r\n`结尾

  • 苹果系统一般以`\r`结尾

load_text

dramkit.iotools.load_text(fpath, sep=',', del_first_line=False, del_first_col=False, to_pd=True, keep_header=True, encoding=None, del_last_col=False, logger=None)

读取文本文件数据,要求文件中每行存放一个数据样本

Parameters:
  • fpath (str) – 文本文件路径

  • sep (str) – 字段分隔符,默认`,`

  • del_first_line (bool) –

    是否删除首行,默认不删除

    Note

    若del_first_line为True,则输出pandas.DataFrame没有列名

  • del_first_col (bool) – 是否删除首列,默认不删除

  • to_pd (bool) – 是否输出为pandas.DataFrame,默认是

  • keep_header (bool) – 输出为pandas.DataFrame时是否以首行作为列名,默认是

  • encoding (str, None) – 指定编码方式,默认不指定时会尝试以uft-8和gbk编码读取

  • del_last_col (bool) – 是否删除最后一列,默认否

  • logger (logging.Logger, None) – 日志记录器

Returns:

list, pandas.DataFrame - 返回读取的数据

df2file_pd

dramkit.iotools.df2file_pd(df, fpath, ftype=None, **kwargs)

pandas.DataFrame写入文件

process_df_pd_big

dramkit.iotools.process_df_pd_big(fpath: str, ftype: Optional[str] = None, chunksize: int = 100000, del_unname_cols: bool = True, logger: Optional[Logger] = None, read_kwargs: dict = {}, process_func=None, process_args: tuple = (), process_kwargs: dict = {}, return_df: bool = True)

用pandas读取大数据文件并分批处理,返回处理后的结果

load_df_pd

dramkit.iotools.load_df_pd(fpath: str, ftype: Optional[str] = None, del_unname_cols: bool = True, encoding: Optional[str] = None, logger: Optional[Logger] = None, **kwargs)

用pandas读取文件,返回DataFrame

load_csv

dramkit.iotools.load_csv(fpath, del_unname_cols=True, encoding=None, logger=None, **kwargs)

用pandas读取csv数据

Parameters:
  • fpath (str) – csv文件路径

  • del_unname_cols (bool) – 是否删除未命名列,默认删除

  • encoding (str, None) – 指定编码方式,默认不指定,不指定时会尝试以uft-8和gbk编码读取

  • logger (logging.Logger, None) – 日志记录器

  • **kwargs – 其它 pd.read_csv 支持的参数

Returns:

pandas.DataFrame - 读取的数据

load_dfs_pd

dramkit.iotools.load_dfs_pd(fpaths, kwargs_sort={}, kwargs_drop_dup={}, mark_file=False, **kwargs_loaddf)

读取指定路径列表中所有的dataframe数据文件,整合到一个df里面

Parameters:
  • fpaths (list) – 文件夹路径列表

  • kwargs_sort (dict) – 设置sort_values接受的排序参数

  • kwargs_drop_dup (dict) – 设置drop_duplicates接受的去重参数

  • **kwargs_loaddfdramkit.iotools.load_df_pd() 接受的其它参数

Returns:

pandas.DataFrame - 读取的数据

load_csvs

dramkit.iotools.load_csvs(fpaths, kwargs_sort={}, kwargs_drop_dup={}, **kwargs_loadcsv)

读取指定路径列表中所有的csv文件,整合到一个df里面

Parameters:
  • fpaths (list) – 文件夹路径列表

  • kwargs_sort (dict) – 设置sort_values接受的排序参数

  • kwargs_drop_dup (dict) – 设置drop_duplicates接受的去重参数

  • **kwargs_loadcsvdramkit.iotools.load_dfs_pd() 接受的其它参数

Returns:

pandas.DataFrame - 读取的数据

load_csvs_dir

dramkit.iotools.load_csvs_dir(fdir, kwargs_sort={}, kwargs_drop_dup={}, **kwargs_loadcsv)

读取指定文件夹中所有的csv文件,整合到一个df里面

Parameters:
  • fdir (str) – 文件夹路径

  • kwargs_sort (dict) – 设置sort_values接受的排序参数

  • kwargs_drop_dup (dict) – 设置drop_duplicates接受的去重参数

  • **kwargs_loadcsvdramkit.iotools.load_dfs_pd() 接受的其它参数

Returns:

pandas.DataFrame - 读取的数据

load_excels

dramkit.iotools.load_excels(fdirorfpaths, kwargs_sort={}, kwargs_drop_dup={}, **kwargs_readexcel)

读取指定文件夹中所有的excel文件,整合到一个df里面

Parameters:
  • fdirorfpaths (str, list) – Excel文件所在文件夹路径或Excel文件路径列表

  • kwargs_sort (dict) – 设置sort_values接受的排序参数

  • kwargs_drop_dup (dict) – 设置drop_duplicates接受的去重参数

  • **kwargs_readexceldramkit.iotools.load_dfs_pd 接受的其它参数

Returns:

pandas.DataFrame - 读取的数据

archive_df

dramkit.iotools.archive_df(df_old, df_new, idcols=None, del_dup_cols=None, rep_keep='new', sort_cols=None, ascendings=True, old_ftype=None, new_ftype=None, save_ftype=None, save_path=None, kwargs_read_old={}, kwargs_read_new={}, kwargs_save={'index': None}, method='merge', logger=None)

合并df_new和df_old,再排序、去重、写入csv或excel

add_ext_to_filename

dramkit.iotools.add_ext_to_filename(fpath, ext)

在fpath路径的尾部(扩展名的前面)添加尾缀ext

cut_dffile_by_maxline

dramkit.iotools.cut_dffile_by_maxline(fpath, max_line=10000, kwargs_loaddf={}, kwargs_tofile={'index': None})

get_fpath_ext_num

dramkit.iotools.get_fpath_ext_num(fpath)
给定路径fpath,若不存在,则返回fpath,若存在则返回带数字在后缀的路径
比如fpath=xxx.y存在,则返回xxx_1.y,xxx_1.y也存在,则返回xxx_2.y

cut_bigdffile_by_maxline

dramkit.iotools.cut_bigdffile_by_maxline(fpath, max_line=100000, kwargs_loaddf={}, kwargs_tofile={'index': None})

cut_dffile_by_year

dramkit.iotools.cut_dffile_by_year(fpath, tcol=None, name_last_year=False, kwargs_loaddf={}, kwargs_tofile={'index': None}, kwargs_exists_merge={}, logger=None)

dataframe大文件分割,按年份,tcol指定时间列

Todo

指定几年的合并为一个文件,而不是固定每一年单独一个文件

cut_csv_by_year

dramkit.iotools.cut_csv_by_year(fpath, tcol=None, name_last_year=False, kwargs_loadcsv={}, kwargs_tocsv={'index': None}, logger=None)

csv大文件分割,按年份,tcol指定时间列

clear_text_file

dramkit.iotools.clear_text_file(fpath, encoding=None)

清空文本文件中的内容,保留文件

clear_specified_type_files

dramkit.iotools.clear_specified_type_files(dir_path, type_list, encoding=None, recu_sub_dir=False, **kwargs_getpath)

清空dir_path文件夹下所有类型在type_list中的文件内容,保留文件夹

get_all_paths

dramkit.iotools.get_all_paths(dir_path, ext=None, start=None, include_dir=False, abspath=False, recu_sub_dir=True, only_dir=False, name_func=None)

获取指定文件夹(及其子文件夹)中所有的文件路径

Parameters:
  • dir_path (str) – 文件夹路径

  • ext (None, str, list, tuple) – 指定文件后缀列表,若为None,则包含所有类型文件

  • start (None, str, list, tuple) – 指定文件前缀名列表,若为None,则包含所有类型文件

  • include_dir (bool) – 返回结果中是否包含文件夹路径(包含返回文件路径的文件夹),默认不包含(即只返回文件路径)

  • abspath (bool) – 是否返回绝对路径,默认返回相对路径

  • rec_sub_dir (bool) – 若为False,则不对子文件中的文件进行遍历,即只返回dir_path下的文件(夹)路径

  • only_dir (bool) – 若为True,则只返回子文件夹路径(文件夹名称尾部与ext指定尾部相同的文件夹), 不返回文件路径,此时include_dir不起作用

  • name_func (function) – 判断是否符合文件名筛选的函数,即满足name_func(fpath)值为True的路径被保留, name_func参数为路径,返回结果只能为True或False

Returns:

list - 文件路径列表

del_specified_type_files

dramkit.iotools.del_specified_type_files(dir_path, type_list, recu_sub_dir=True, **kwargs_getpath)

删除dir_path文件夹下所有类型在type_list中的文件

del_dir

dramkit.iotools.del_dir(dir_path)

删除dir_path指定文件夹及其所有内容

del_specified_subdir

dramkit.iotools.del_specified_subdir(dir_path, del_names, recu_sub_dir=True, **kwargs_getpath)

删除dir_path文件夹下所有文件夹名尾缀在del_names中的子文件夹

copy_file_to_dir

dramkit.iotools.copy_file_to_dir(src_path, tgt_dir, force=True)

复制文件

copy_file_to_file

dramkit.iotools.copy_file_to_file(src_path, tgt_path, force=True)

文件复制(指定文件名)

copy_dir_structure

dramkit.iotools.copy_dir_structure(src_dir, tgt_dir, ext=None, recu_sub_dir=True, keep_root_same=True, return_map=False, **kwargs_getpath)

复制文件夹结构,不复制里面的文件

copy_dir

dramkit.iotools.copy_dir(src_dir, tgt_dir, ext_file=None, ext_dir=None, recu_sub_dir=True, force=True, keep_root_same=True, keep_empty_dir_when_not_recu=False, kwargs_getpath_dir={}, kwargs_getpath_file={})

复制整个文件夹及其内容

move_file_to_file

dramkit.iotools.move_file_to_file(src_path, tgt_path, force=True)

移动文件(指定文件名)

move_file_to_dir

dramkit.iotools.move_file_to_dir(src_path, tgt_dir, force=True)

移动文件

move_dir

dramkit.iotools.move_dir(src_dir, tgt_dir, ext_file=None, ext_dir=None, recu_sub_dir=True, force=True, keep_root_same=True, keep_empty_dir_when_not_recu=False, kwargs_getpath_dir={}, kwargs_getpath_file={})

移动整个文件夹及其内容

move_specified_type_files

dramkit.iotools.move_specified_type_files(dir_path, type_list, target_dir, recu_sub_dir=True)

移动dir_path文件夹下所有类型在type_list中的文件到target_dir中

make_dir

dramkit.iotools.make_dir(dir_path)

新建文件夹

make_file_dir

dramkit.iotools.make_file_dir(fpath)

若路径fpath所指文件夹不存在,创建之

make_path_dir

dramkit.iotools.make_path_dir(fpath)

若fpath所指文件夹路径不存在,则新建之

get_last_change_time

dramkit.iotools.get_last_change_time(fpath, strformat='%Y-%m-%d %H:%M:%S')

获取文件的最后修改时间

get_file_info

dramkit.iotools.get_file_info(fpath)

获取文件信息

get_files_info

dramkit.iotools.get_files_info(fpaths)

获取列表中的文件信息

get_files_info_dir

dramkit.iotools.get_files_info_dir(dir_path, **kwargs_getpath)
获取dir_path文件夹中所有文件的信息
**kwargs为 dramkit.iotoos.get_all_paths() 接受的参数

py2pyc

dramkit.iotools.py2pyc(py_path, pyc_path=None, force=True, del_py=False, **kwargs_compile)

py文件编译为pyc文件

py2pyc_dir

dramkit.iotools.py2pyc_dir(dir_path, force=True, del_py=False, recu_sub_dir=True, kwargs_compile={}, kwargs_getpath={})

将一个文件夹下的所有.py文件编译为.pyc文件

pyc2py

dramkit.iotools.pyc2py(pyc_path, py_path=None, force=True, del_pyc=False, logger=None)

pyc文件反编译为py

pyc2py_dir

dramkit.iotools.pyc2py_dir(dir_path, force=True, del_pyc=False, recu_sub_dir=True, logger=None, **kwargs_getpath)

将一个文件夹下的所有.pyc文件反编译为.py文件

get_mac_address

dramkit.iotools.get_mac_address()

获取Mac地址,返回大写地址,如:F8-A2-D6-CC-BB-AA

get_ipconfig_all_win

dramkit.iotools.get_ipconfig_all_win(logger=False)

get_ipconfig_win

dramkit.iotools.get_ipconfig_win(logger=False)

get_hardware_ids

dramkit.iotools.get_hardware_ids()
获取电脑硬件序列号信息,包括主板和硬盘序列号、MAC地址、BIOS序列号等

get_ip1

dramkit.iotools.get_ip1()

get_ip2

dramkit.iotools.get_ip2()

get_ip_public

dramkit.iotools.get_ip_public()
获取公网ip

get_ip_public2

dramkit.iotools.get_ip_public2()

get_tasks_info_win

dramkit.iotools.get_tasks_info_win()

windows下获取任务信息

get_ports_info_win

dramkit.iotools.get_ports_info_win()

windows下获取端口信息

zip_files

dramkit.iotools.zip_files(zip_path, fpaths, keep_ori_path=True, keep_zip_new=True)

使用zipfile将指定路径列表(不包括子文件(夹)内容)打包为.zip文件

Parameters:
  • zip_path (str) – zip压缩包保存路径

  • fpaths (list) – 需要压缩的路径列表(应为相对路径)

  • keep_ori_path (bool) –

    • 若为True, 则压缩文件会保留fpaths中文件的原始路径

    • 若为False, 则fpaths中所有文件在压缩文件中都在统一根目录下

  • keep_zip_new (bool) – 若为True,将覆盖已有压缩文件,否则在已有文件里面新增

zip_fpath

dramkit.iotools.zip_fpath(fpath, zip_path=None, kwargs_zip={}, kwargs_getpath={})

使用zipfile压缩单个文件(夹)下所有内容为.zip文件

Parameters:
  • fpath (str) – 待压缩文件(夹)路径(应为相对路径)

  • zip_path (None, str) – 压缩文件保存路径,若为None,则为fpath路径加后缀

  • **kwargs_zipdramkit.iotools.zip_files() 接受的参数

zip_fpaths

dramkit.iotools.zip_fpaths(zip_path, fpaths, kwargs_zip={}, kwargs_getpath={})

使用zipfile将指定路径列表(包括子文件(夹)所有内容)打包为.zip文件

Parameters:
  • zip_path (str) – zip压缩包保存路径

  • fpaths (list) – 需要压缩的路径列表(可为文件也可为文件夹, 应为相对路径)

  • **kwargs_zipdramkit.iotools.zip_files() 接受的参数

zip_extract

dramkit.iotools.zip_extract(fzip, to_dir=None, replace_exists=True)

用zipfile解压文件

zip_fpath_7z

dramkit.iotools.zip_fpath_7z(fpath, zip_path=None, mode='zip', pwd=None, keep_zip_new=True)

7z命令压缩单个文件(夹)到.zip文件

Parameters:
  • fpath (str) – 待压缩文件(夹)路径

  • zip_path (None, str) – 压缩文件保存路径,若为None,则为fpath路径加后缀

  • mode (str) – 压缩文件后缀,可选[‘7z’, ‘zip’]

  • pwd (str) – 密码字符串

  • keep_zip_new (bool) –

    • 若为True,则zip_path将覆盖原来已经存在的文件

    • 若为False,则zip_path将在原来已有的文件中新增需要压缩的文件

zip_fpaths_7z

dramkit.iotools.zip_fpaths_7z(zip_path, fpaths, mode='zip', pwd=None, keep_zip_new=True)

7z命令压缩多个文件(夹)列表到.zip文件

Parameters:
  • zip_path (str) – zip压缩包保存路径

  • fpaths (list) –

    待压缩文件(夹)路径列表

    Warning

    fpaths太长的时候可能会报错

  • mode (str) – 压缩文件后缀,可选[‘7z’, ‘zip’]

  • pwd (str) – 密码字符串

  • keep_zip_new (bool) –

    • 若为True,则zip_path将覆盖原来已经存在的文件

    • 若为False,则zip_path将在原来已有的文件中新增需要压缩的文件

extract_7z

dramkit.iotools.extract_7z()

7z命令解压文件

cmdrun

dramkit.iotools.cmdrun(cmd_str, logger=None)

调用cmd执行cmd_str命令

cmdruns_multi_process

dramkit.iotools.cmdruns_multi_process(cmd_str_list, logger=None, multi_line=None, keep_order=True)

cmd_run_pys_multi_process

dramkit.iotools.cmd_run_pys_multi_process(py_list, logger=None, multi_line=None, keep_order=True)

cmd_run_pys

dramkit.iotools.cmd_run_pys(py_list, logger=None)

cmd命令批量运行py脚本,logger捕捉日志

rename_files_in_dir

dramkit.iotools.rename_files_in_dir(dir_path, func_rename)

对指定文件夹中的文件进行批量重命名

Parameters:
  • dir_path (str) – 目标文件夹

  • func_rename (function) – 命名规则函数: name_new = func_rename(name)

find_files_include_str

dramkit.iotools.find_files_include_str(target_str, fpaths, re_match=False, return_all_find=False, logger=None)

在指定文件路径列表中,查找哪些文件里面包含了目标字符串

Parameters:
  • target_str (str) – 目标字符串

  • fpaths (str, list) – 文件路径列表

  • re_match (bool) – 若为True,则目标字符串 target_str 按正则表达式处理

  • return_all_find (bool) – 若为True,则返回所有找到的目标字符串,否则只返回第一个

  • logger (None, logging.Logger) – 日志记录器

Returns:

dict - 返回dict, key为找到的文件路径,value为包含目标字符串的文本内容(仅第一次出现的位置)

find_dir_include_str

dramkit.iotools.find_dir_include_str(target_str, root_dir=None, file_types=None, re_match=False, return_all_find=False, logger=None, **kwargs_getpath)

在指定目录下的文件中,查找哪些文件里面包含了目标字符串

Todo

增加排除文件类型设置

Parameters:
  • target_str (str) – 目标字符串

  • root_dir (str, None) – 目标文件夹,目标字符串在此文件夹及其所有子文件内所有文本文件中搜索 若为None,则在os.getcwd()下搜索

  • file_types (None, str, list) –

    指定查找的文件后缀范围:

    • None, 在所有文件中查找

    • str, 指定一类文件后缀, 如’.py’表示在Python脚本中查找

    • list, 指定一个后缀列表, 如[‘.py’, ‘.txt’]

  • re_match (bool) – 若为True,则目标字符串 target_str 按正则表达式处理

  • return_all_find (bool) – 若为True,则返回所有找到的目标字符串,否则只返回第一个

  • logger (None, logging.Logger) – 日志记录器

Returns:

dict - 返回dict, key为找到的文件路径,value为包含目标字符串的文本内容(仅第一次出现的位置)

replace_str_in_file

dramkit.iotools.replace_str_in_file(fpath, ori_str, new_str, fpath_new=None, kwargs_read={}, kwargs_write={})
替换文本文件fpath中的ori_str为new_str
fpath_new指定新文件路径
若fpath_new为None,则新文件在原文件名加后缀_new
若fpath_new为’replace’,则新文件替换原文件
若fpath_new是函数,则新文件璐姐为fpath_new(fpath)
kwargs_read为read_lines函数接收参数
keargs_write为write_txt函数接收参数

replace_str_in_files

dramkit.iotools.replace_str_in_files(fpaths, ori_str, new_str, fpath_new=None, kwargs_read={}, kwargs_write={})

文本文件内容批量替换,参数见 replace_str_in_file()

get_pip_pkgs_win_deprecated

dramkit.iotools.get_pip_pkgs_win_deprecated()

获取windows下pip安装包列表

get_pip_pkgs_win

dramkit.iotools.get_pip_pkgs_win()

获取windows下pip安装包列表

get_filedir

dramkit.iotools.get_filedir(fpath)

获取fpath路径所在文件夹路径

get_filename

dramkit.iotools.get_filename(fpath, with_ext=True)

获取文件名(去除前缀路径),with_ext为False则返回不带后缀

get_file_ext_type

dramkit.iotools.get_file_ext_type(fpath)

提取文件扩展名

get_parent_path

dramkit.iotools.get_parent_path(path, n=1)

获取路径path的n级父路径