-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
195 lines (158 loc) · 5.56 KB
/
utils.py
File metadata and controls
195 lines (158 loc) · 5.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import os
import sys
import subprocess
import json
import pickle
import xml.etree.ElementTree
import logging
import pandas as pd
import difflib
from functools import wraps
import time
import hashlib
import concurrent.futures
import signal
import multiprocessing
def get_logger_for_file(loglevel=logging.ERROR, logname=__name__):
logger = logging.getLogger(logname)
if not logger.hasHandlers():
__formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s : %(message)s')
__handler = logging.StreamHandler(sys.stderr)
__handler.setFormatter(__formatter)
logger.addHandler(__handler)
logger.setLevel(loglevel)
return logger
loglevel = logging.DEBUG
_logger = get_logger_for_file(loglevel)
# time logger is always in debug mode
_time_logger = get_logger_for_file(logging.DEBUG, 'timing')
def run_cmd(cmd, shell=True, timeout=0.0):
_logger.info(f'run cmd: {cmd}')
# ret, output = subprocess.getstatusoutput(cmd, encoding='utf-8')
try:
ret = 0
if timeout <= 0.0:
output = subprocess.check_output(cmd,
shell=shell,
stderr=subprocess.STDOUT)
else:
output = subprocess.check_output(cmd,
shell=shell,
stderr=subprocess.STDOUT,
timeout=timeout)
except subprocess.CalledProcessError as e:
_logger.error(f'Error: {cmd}')
ret = e.returncode
output = e.output
except subprocess.TimeoutExpired as e:
_logger.error(f'Timeout: {cmd}')
ret = -1
output = e.output
return ret, output.decode('utf-8')
def init_dir_for_dump(dump_path):
if '/' not in dump_path and '\\' not in dump_path:
return dump_path
dump_dir = os.path.dirname(dump_path)
if not os.path.exists(dump_dir):
os.makedirs(dump_dir)
def load_json(json_path):
with open(json_path, 'r') as f:
data = json.load(f)
return data
def dump_json(obj, dump_path, indent=1):
init_dir_for_dump(dump_path)
with open(dump_path, 'w') as f:
json.dump(obj, f, indent=indent)
def load_pkl(pickle_path):
with open(pickle_path, 'rb') as f:
data = pickle.load(f)
return data
def dump_pkl(obj, dump_path):
init_dir_for_dump(dump_path)
with open(dump_path, 'wb') as f:
pickle.dump(obj, f, protocol=4)
def load_csv(csv_path, **kwargs):
try:
return pd.read_csv(csv_path, **kwargs)
except Exception as e:
_logger.error(f'Fail to load {csv_path}')
raise e
def get_str_similar(str1, str2):
return difflib.SequenceMatcher(None, str1, str2).ratio()
def timing(f):
@wraps(f)
def wrap(*args, **kw):
ts = time.time()
result = f(*args, **kw)
te = time.time()
cost = te - ts
if cost < 60:
_time_logger.debug('func:%r took: %2.4f sec' % \
(f.__name__, cost))
elif 60 < cost < 3600:
_time_logger.debug('func:%r took: %2.4f min' % \
(f.__name__, cost / 60))
else:
_time_logger.debug('func:%r took: %2.4f hours' % \
(f.__name__, cost / 3600))
return result
return wrap
def get_all_loggers():
loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
return loggers
def get_file_hash(file_path):
with open(file_path, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
def timeout_run_function(func_ptr, args, timeout):
"""
Need to catch concurrent.futures.TimeoutError
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func_ptr, *args)
result = future.result(timeout=timeout)
return result
class TimeoutException(Exception):
pass
def timeout(seconds=5, error_message="Function call timed out"):
"""
A decorator that raises a TimeoutException if the decorated function
runs longer than `seconds`.
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Define a signal handler that raises the timeout exception
def _handle_timeout(signum, frame):
raise TimeoutException(error_message)
# Set the signal handler and a timeout alarm
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
return func(*args, **kwargs)
finally:
# Cancel the alarm if the function finishes in time
signal.alarm(0)
return wrapper
return decorator
def _eval_for_timeout(code, queue):
try:
result = eval(code)
queue.put((True, result))
except Exception as e:
queue.put((False, e))
def eval_with_timeout(code, timeout):
queue = multiprocessing.Queue()
process = multiprocessing.Process(target=_eval_for_timeout,
args=(code, queue))
process.start()
process.join(timeout=timeout)
if process.is_alive():
process.terminate()
process.join()
raise TimeoutException('Timeout for eval')
else:
succ, res = queue.get()
if succ:
return res
else:
raise res