bypy/util.py
#!/usr/bin/env python
# encoding: utf-8
# PYTHON_ARGCOMPLETE_OK
# from __future__ imports must occur at the beginning of the file
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import division
### imports
import os
import sys
import time
import io
import json
import pprint
import codecs
import threading
import traceback
import shutil
# unify Python 2 and 3
if sys.version_info[0] == 2:
from Queue import Queue
elif sys.version_info[0] == 3:
unicode = str
basestring = str
long = int
raw_input = input
from queue import Queue
from . import const
from . import printer_console
from .printer_util import (iswindows, human_size, interpret_size)
from .printer import (
bannerwarn, plog, pdbg, pinfo, pwarn, perr)
pr = printer_console.pr
prcolor = printer_console.prcolor
ask = printer_console.ask
pprgr = printer_console.pprgr
human_size
interpret_size
plog
pdbg
pinfo
pwarn
def remove_backslash(s):
return s.replace(r'\/', r'/')
rb = remove_backslash
# no idea who screws the sys.stdout.encoding
# the locale is 'UTF-8', sys.stdin.encoding is 'UTF-8',
# BUT, sys.stdout.encoding is None ...
def fixenc(stdenc):
if iswindows():
bannerwarn("WARNING: StdOut encoding '{}' is unable to encode CJK strings.\n" \
"Files with non-ASCII names may not be handled correctly.".format(stdenc))
else:
# fix by @xslidian
if not stdenc:
stdenc = 'utf-8'
sys.stdout = codecs.getwriter(stdenc)(sys.stdout)
sys.stderr = codecs.getwriter(stdenc)(sys.stderr)
# http://stackoverflow.com/questions/9403986/python-3-traceback-fails-when-no-exception-is-active
def formatex(ex):
s = ''
if ex and isinstance(ex, Exception):
s = "Exception:\n{} - {}\nStack:\n{}".format(
type(ex), ex, ''.join(traceback.format_stack()))
return s
# marshaling
def str2bool(s):
if isinstance(s, basestring):
if s:
sc = s.lower()[0]
if sc == 't' or sc == 'y' or (sc >= '1' and sc <= '9'):
return True
else:
return False
else:
return False
else:
# don't change
return s
def str2int(s):
if isinstance(s, basestring):
return int(s)
else:
# don't change
return s
def str2float(s):
if isinstance(s, basestring):
return float(s)
else:
# don't change
return s
# guarantee no-exception
def copyfile(src, dst):
result = const.ENoError
try:
shutil.copyfile(src, dst)
except (shutil.Error, IOError) as ex:
perr("Fail to copy '{}' to '{}'.\n{}".format(
src, dst, formatex(ex)))
result = const.EFailToCreateLocalFile
return result
def movefile(src, dst):
result = const.ENoError
try:
shutil.move(src, dst)
except (shutil.Error, OSError) as ex:
perr("Fail to move '{}' to '{}'.\n{}".format(
src, dst, formatex(ex)))
result = const.EFailToCreateLocalFile
return result
def removefile(path, verbose = False):
result = const.ENoError
try:
if verbose:
pr("Removing local file '{}'".format(path))
if path:
os.remove(path)
except Exception as ex:
perr("Fail to remove local fle '{}'.\n{}".format(
path, formatex(ex)))
result = const.EFailToDeleteFile
return result
def removedir(path, verbose = False):
result = const.ENoError
try:
if verbose:
pr("Removing local directory '{}'".format(path))
if path:
shutil.rmtree(path)
except Exception as ex:
perr("Fail to remove local directory '{}'.\n{}".format(
path, formatex(ex)))
result = const.EFailToDeleteDir
return result
def removepath(path):
if os.path.isdir(path):
return removedir(path)
elif os.path.isfile(path):
return removefile(path)
else:
perr("Can't remove '{}', it's non-file and none-dir.".format(path))
return const.EArgument
def makedir(path, mode = 0o777, verbose = False):
result = const.ENoError
if verbose:
pr("Creating local directory '{}'".format(path))
if path and not os.path.exists(path):
try:
os.makedirs(path, mode)
except os.error as ex:
perr("Failed at creating local dir '{}'.\n{}".format(
path, formatex(ex)))
result = const.EFailToCreateLocalDir
return result
# guarantee no-exception
def getfilesize(path):
size = -1
try:
size = os.path.getsize(path)
except os.error as ex:
perr("Exception occured while getting size of '{}'.\n{}".format(
path, formatex(ex)))
return size
# guarantee no-exception
def getfilemtime(path):
mtime = -1
try:
mtime = os.path.getmtime(path)
except os.error as ex:
perr("Exception occured while getting modification time of '{}'.\n{}".format(
path, formatex(ex)))
return mtime
def getfilemtime_int(path):
# just int it, this is reliable no matter how stat_float_times() is changed
return int(getfilemtime(path))
# mtime = getfilemtime(path)
# if (mtime == -1):
# return mtime
#
# if os.stat_float_times():
# mtime = int(mtime)
#
# return mtime
# seems os.path.join() doesn't handle Unicode well
def joinpath(first, second, sep = os.sep):
head = ''
if first:
head = first.rstrip(sep) + sep
tail = ''
if second:
tail = second.lstrip(sep)
return head + tail
# CAN Python make Unicode right?
# http://houtianze.github.io/python/unicode/json/2016/01/03/another-python-unicode-fisaco-on-json.html
def jsondump_actual(data, f):
if sys.version_info[0] == 2:
f.write(unicode(json.dumps(data, ensure_ascii = False, sort_keys = True, indent = 2)))
elif sys.version_info[0] == 3:
json.dump(data, f, ensure_ascii = False, sort_keys = True, indent = 2)
# no try ... except protection, will throw exceptions
def jsondump(data, filename, semaphore):
if semaphore:
with semaphore:
with io.open(filename, 'w', encoding = 'utf-8') as f:
jsondump_actual(data, f)
else:
with io.open(filename, 'w', encoding = 'utf-8') as f:
jsondump_actual(data, f)
def jsondump_no_exception(data, filename, semaphore):
try:
jsondump(data, filename, semaphore)
except Exception as ex:
perr("Fail to dump json '{}' to file '{}'.\nException:\n{}".format(
data, filename, formatex(ex)))
# no try ... except protection, will throw exceptions
def jsonload(filename):
with io.open(filename, 'r', encoding = 'utf-8') as f:
return json.load(f)
def jsonload_no_exception(filename):
try:
jsonload(filename)
# In `python 3`, the exception when failing to parse is `json.JSONDecodeError` (subclass of `ValueError`)
# but in `python 2`, it's just `ValueError`
except Exception as ex:
perr("Fail to load '{}' as json, exception:\n{}".format(filename, formatex(ex)))
return {}
def ls_type(isdir):
return 'D' if isdir else 'F'
def ls_time(itime):
return time.strftime('%Y-%m-%d, %H:%M:%S', time.localtime(itime))
# no leading, trailing '/'
# remote path rule:
# - all public methods of ByPy shall accept remote path as "partial path"
# (before calling get_pcs_path())
# - all private methods of ByPy shall accept remote path as "full path"
# (after calling get_pcs_path())
def get_pcs_path(path):
if not path or path == '/' or path == '\\':
return const.AppPcsPath
return (const.AppPcsPath + '/' + path.strip('/')).rstrip('/')
def is_pcs_root_path(path):
return path == const.AppPcsPath or path == const.AppPcsPath + '/'
def print_pcs_list_bare(list):
if list:
for f in list:
pr("{} {} {} {} {} {}".format(
ls_type(f['isdir']),
f['path'],
f['size'],
ls_time(f['ctime']),
ls_time(f['mtime']),
f['md5'] if 'md5' in f else ''))
def print_pcs_list(json, foundmsg = "Found:", notfoundmsg = "Nothing found."):
list = json['list']
if list:
pr(foundmsg)
print_pcs_list_bare(list)
else:
pr(notfoundmsg)
# https://stackoverflow.com/questions/10883399/unable-to-encode-decode-pprint-output
class MyPrettyPrinter(pprint.PrettyPrinter):
def format(self, obj, context, maxlevels, level):
if isinstance(obj, unicode):
#return (obj.encode('utf8'), True, False)
return (obj, True, False)
if isinstance(obj, bytes):
convert = False
#for c in obj:
# if ord(c) >= 128:
# convert = True
# break
try:
codecs.decode(obj)
except:
convert = True
if convert:
return ("0x{}".format(obj), True, False)
return pprint.PrettyPrinter.format(self, obj, context, maxlevels, level)
class NewThread(threading.Thread):
def __init__(self, func):
threading.Thread.__init__(self)
self.func = func
def run(self):
self.func()
def startthread(func):
NewThread(func).start()
def inc_list_size(li, size = 3, filler = 0):
i = len(li)
while (i < size):
li.append(filler)
i += 1
def comp_semver(v1, v2):
v1a = v1.split('.')
v2a = v2.split('.')
v1ia = [int(i) for i in v1a]
v2ia = [int(i) for i in v2a]
inc_list_size(v1ia, 3)
inc_list_size(v2ia, 3)
i = 0
while (i < 3):
if v1ia[i] != v2ia[i]:
return v1ia[i] - v2ia[i]
i += 1
return 0
# NOT in use, see deque
class FixedSizeQueue(object):
def __init__(self, size = 1024):
self.size = size
self.q = Queue()
def put(self, item):
if self.q.qsize() >= self.size:
self.q.get()
self.q.put(item)
def get(self):
return self.q.get()
def nop(*args):
pass
# vim: tabstop=4 noexpandtab shiftwidth=4 softtabstop=4 ff=unix fileencoding=utf-8