lib.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import os
  2. import sys
  3. import locale
  4. import random
  5. import time
  6. import signal
  7. from contextlib import contextmanager
  8. @contextmanager
  9. def default_sigint():
  10. original_sigint_handler = signal.getsignal(signal.SIGINT)
  11. signal.signal(signal.SIGINT, signal.SIG_DFL)
  12. try:
  13. yield
  14. except:
  15. raise
  16. finally:
  17. signal.signal(signal.SIGINT, original_sigint_handler)
  18. def to_utf8(s):
  19. """Re-encode string from the default system encoding to UTF-8."""
  20. current = locale.getpreferredencoding()
  21. return s.decode(current).encode("UTF-8") if s and current != "UTF-8" else s
  22. def debug(obj, fd=sys.stderr):
  23. """Write obj to standard error."""
  24. string = str(obj.encode(get_encoding(fd), "backslashreplace")
  25. if isinstance(obj, unicode) else obj)
  26. fd.write(string + "\n")
  27. def catch_exceptions(exit_codes, fun, *args, **kwargs):
  28. """
  29. Catch exceptions on fun(*args, **kwargs) and return the exit code specified
  30. in the exit_codes dictionary. Return 0 if no exception is raised.
  31. """
  32. try:
  33. fun(*args, **kwargs)
  34. return 0
  35. except tuple(exit_codes.keys()) as exc:
  36. debug("[%s] %s" % (exc.__class__.__name__, exc))
  37. return exit_codes[exc.__class__]
  38. def get_encoding(fd):
  39. """Guess terminal encoding."""
  40. return fd.encoding or locale.getpreferredencoding()
  41. def first(it):
  42. """Return first element in iterable."""
  43. return it.next()
  44. def string_to_dict(string):
  45. """Return dictionary from string "key1=value1, key2=value2"."""
  46. if string:
  47. pairs = [s.strip() for s in string.split(",")]
  48. return dict(pair.split("=") for pair in pairs)
  49. def get_first_existing_filename(prefixes, relative_path):
  50. """Get the first existing filename of relative_path seeking on prefixes directories."""
  51. for prefix in prefixes:
  52. path = os.path.join(prefix, relative_path)
  53. if os.path.exists(path):
  54. return path
  55. def retriable_exceptions(fun, retriable_exceptions, max_retries=None):
  56. """Run function and retry on some exceptions (with exponential backoff)."""
  57. retry = 0
  58. while 1:
  59. try:
  60. return fun()
  61. except tuple(retriable_exceptions) as exc:
  62. retry += 1
  63. if type(exc) not in retriable_exceptions:
  64. raise exc
  65. elif max_retries is not None and retry > max_retries:
  66. debug("Retry limit reached, time to give up")
  67. raise exc
  68. else:
  69. seconds = random.uniform(0, 2**retry)
  70. message = ("[Retryable error {current_retry}/{total_retries}] " +
  71. "{error_type} ({error_msg}). Wait {wait_time} seconds").format(
  72. current_retry=retry,
  73. total_retries=max_retries or "-",
  74. error_type=type(exc).__name__,
  75. error_msg=str(exc) or "-",
  76. wait_time="%.1f" % seconds,
  77. )
  78. debug(message)
  79. time.sleep(seconds)