ansistrm.py 7.3 KB


  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. @Author: hywell
  5. @Email: hywell.28@gmail.com
  6. @Blog: iassas.com
  7. @Date: 2019/10/16 13:53
  8. """
  9. import logging
  10. import re
  11. import sys
  12. from lib.core.setting import IS_WIN
  13. from lib.core.convert import stdout_encode
  14. if IS_WIN:
  15. import ctypes
  16. import ctypes.wintypes
  17. # Reference: https://gist.github.com/vsajip/758430
  18. # https://github.com/ipython/ipython/issues/4252
  19. # https://msdn.microsoft.com/en-us/library/windows/desktop/ms686047%28v=vs.85%29.aspx
  20. ctypes.windll.kernel32.SetConsoleTextAttribute.argtypes = [ctypes.wintypes.HANDLE, ctypes.wintypes.WORD]
  21. ctypes.windll.kernel32.SetConsoleTextAttribute.restype = ctypes.wintypes.BOOL
  22. class ColorizingStreamHandler(logging.StreamHandler):
  23. # color names to indices
  24. color_map = {
  25. 'black': 0,
  26. 'red': 1,
  27. 'green': 2,
  28. 'yellow': 3,
  29. 'blue': 4,
  30. 'magenta': 5,
  31. 'cyan': 6,
  32. 'white': 7,
  33. }
  34. # levels to (background, foreground, bold/intense)
  35. level_map = {
  36. logging.DEBUG: (None, 'blue', False),
  37. logging.INFO: (None, 'green', False),
  38. logging.WARNING: (None, 'yellow', False),
  39. logging.ERROR: (None, 'red', False),
  40. logging.CRITICAL: ('red', 'white', False)
  41. }
  42. csi = '\x1b['
  43. reset = '\x1b[0m'
  44. bold = "\x1b[1m"
  45. disable_coloring = False
  46. @property
  47. def is_tty(self):
  48. isatty = getattr(self.stream, 'isatty', None)
  49. return isatty and isatty() and not self.disable_coloring
  50. def emit(self, record):
  51. try:
  52. message = stdout_encode(self.format(record))
  53. message = message.decode()
  54. stream = self.stream
  55. if not self.is_tty:
  56. if message and message[0] == "\r":
  57. message = message[1:]
  58. stream.write(message)
  59. else:
  60. self.output_colorized(message)
  61. stream.write(getattr(self, 'terminator', '\n'))
  62. self.flush()
  63. except (KeyboardInterrupt, SystemExit):
  64. raise
  65. except IOError:
  66. pass
  67. except:
  68. self.handleError(record)
  69. if not sys.platform == 'win32':
  70. def output_colorized(self, message):
  71. self.stream.write(message)
  72. else:
  73. ansi_esc = re.compile(r'\x1b\[((?:\d+)(?:;(?:\d+))*)m')
  74. nt_color_map = {
  75. 0: 0x00, # black
  76. 1: 0x04, # red
  77. 2: 0x02, # green
  78. 3: 0x06, # yellow
  79. 4: 0x01, # blue
  80. 5: 0x05, # magenta
  81. 6: 0x03, # cyan
  82. 7: 0x07, # white
  83. }
  84. def output_colorized(self, message):
  85. parts = self.ansi_esc.split(message)
  86. write = self.stream.write
  87. h = None
  88. fd = getattr(self.stream, 'fileno', None)
  89. if fd is not None:
  90. fd = fd()
  91. if fd in (1, 2): # stdout or stderr
  92. h = ctypes.windll.kernel32.GetStdHandle(-10 - fd)
  93. while parts:
  94. text = parts.pop(0)
  95. if text:
  96. write(text)
  97. if parts:
  98. params = parts.pop(0)
  99. if h is not None:
  100. params = [int(p) for p in params.split(';')]
  101. color = 0
  102. for p in params:
  103. if 40 <= p <= 47:
  104. color |= self.nt_color_map[p - 40] << 4
  105. elif 30 <= p <= 37:
  106. color |= self.nt_color_map[p - 30]
  107. elif p == 1:
  108. color |= 0x08 # foreground intensity on
  109. elif p == 0: # reset to default color
  110. color = 0x07
  111. else:
  112. pass # error condition ignored
  113. ctypes.windll.kernel32.SetConsoleTextAttribute(h, color)
  114. def colorize(self, message, levelno):
  115. if levelno in self.level_map and self.is_tty:
  116. bg, fg, bold = self.level_map[levelno]
  117. params = []
  118. if bg in self.color_map:
  119. params.append(str(self.color_map[bg] + 40))
  120. if fg in self.color_map:
  121. params.append(str(self.color_map[fg] + 30))
  122. if bold:
  123. params.append('1')
  124. if params and message:
  125. match = re.search(r"\A(\s+)", message)
  126. prefix = match.group(1) if match else ""
  127. match = re.search(r"\[([A-Z ]+)\]", message) # log level
  128. if match:
  129. level = match.group(1)
  130. if message.startswith(self.bold):
  131. message = message.replace(self.bold, "")
  132. reset = self.reset + self.bold
  133. params.append('1')
  134. else:
  135. reset = self.reset
  136. message = message.replace(level, ''.join((self.csi, ';'.join(params), 'm', level, reset)), 1)
  137. match = re.search(r"\A\s*\[([\d:]+)\]", message) # time
  138. if match:
  139. time = match.group(1)
  140. if not message.endswith(self.reset):
  141. reset = self.reset
  142. elif self.bold in message: # bold
  143. reset = self.reset + self.bold
  144. else:
  145. reset = self.reset
  146. message = message.replace(time, ''.join(
  147. (self.csi, str(self.color_map["cyan"] + 30), 'm', time, reset)), 1)
  148. match = re.search(r"\[(#\d+)\]", message) # counter
  149. if match:
  150. counter = match.group(1)
  151. if not message.endswith(self.reset):
  152. reset = self.reset
  153. elif self.bold in message: # bold
  154. reset = self.reset + self.bold
  155. else:
  156. reset = self.reset
  157. message = message.replace(counter, ''.join(
  158. (self.csi, str(self.color_map["yellow"] + 30), 'm', counter, reset)), 1)
  159. if level != "PAYLOAD":
  160. for match in re.finditer(r"[^\w]'([^']+)'", message): # single-quoted
  161. string = match.group(1)
  162. if not message.endswith(self.reset):
  163. reset = self.reset
  164. elif self.bold in message: # bold
  165. reset = self.reset + self.bold
  166. else:
  167. reset = self.reset
  168. message = message.replace("'%s'" % string, "'%s'" % ''.join(
  169. (self.csi, str(self.color_map["white"] + 30), 'm', string, reset)), 1)
  170. else:
  171. message = ''.join((self.csi, ';'.join(params), 'm', message, self.reset))
  172. if prefix:
  173. message = "%s%s" % (prefix, message)
  174. return message
  175. def format(self, record):
  176. message = logging.StreamHandler.format(self, record)
  177. return self.colorize(message, record.levelno)