Viewing file: helper.py (15.95 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- test-case-name: twisted.conch.test.test_helper -*- # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details.
""" Partial in-memory terminal emulator
@author: Jp Calderone """
import re import string
from zope.interface import implementer
from incremental import Version
from twisted.conch.insults import insults from twisted.internet import defer, protocol, reactor from twisted.logger import Logger from twisted.python import _textattributes from twisted.python.compat import iterbytes from twisted.python.deprecate import deprecated, deprecatedModuleAttribute
FOREGROUND = 30 BACKGROUND = 40 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9)
class _FormattingState(_textattributes._FormattingStateMixin): """ Represents the formatting state/attributes of a single character.
Character set, intensity, underlinedness, blinkitude, video reversal, as well as foreground and background colors made up a character's attributes. """
compareAttributes = ( "charset", "bold", "underline", "blink", "reverseVideo", "foreground", "background", "_subtracting", )
def __init__( self, charset=insults.G0, bold=False, underline=False, blink=False, reverseVideo=False, foreground=WHITE, background=BLACK, _subtracting=False, ): self.charset = charset self.bold = bold self.underline = underline self.blink = blink self.reverseVideo = reverseVideo self.foreground = foreground self.background = background self._subtracting = _subtracting
@deprecated(Version("Twisted", 13, 1, 0)) def wantOne(self, **kw): """ Add a character attribute to a copy of this formatting state.
@param kw: An optional attribute name and value can be provided with a keyword argument.
@return: A formatting state instance with the new attribute.
@see: L{DefaultFormattingState._withAttribute}. """ k, v = kw.popitem() return self._withAttribute(k, v)
def toVT102(self): # Spit out a vt102 control sequence that will set up # all the attributes set here. Except charset. attrs = [] if self._subtracting: attrs.append(0) if self.bold: attrs.append(insults.BOLD) if self.underline: attrs.append(insults.UNDERLINE) if self.blink: attrs.append(insults.BLINK) if self.reverseVideo: attrs.append(insults.REVERSE_VIDEO) if self.foreground != WHITE: attrs.append(FOREGROUND + self.foreground) if self.background != BLACK: attrs.append(BACKGROUND + self.background) if attrs: return "\x1b[" + ";".join(map(str, attrs)) + "m" return ""
CharacterAttribute = _FormattingState
deprecatedModuleAttribute( Version("Twisted", 13, 1, 0), "Use twisted.conch.insults.text.assembleFormattedText instead.", "twisted.conch.insults.helper", "CharacterAttribute", )
# XXX - need to support scroll regions and scroll history @implementer(insults.ITerminalTransport) class TerminalBuffer(protocol.Protocol): """ An in-memory terminal emulator. """
for keyID in ( b"UP_ARROW", b"DOWN_ARROW", b"RIGHT_ARROW", b"LEFT_ARROW", b"HOME", b"INSERT", b"DELETE", b"END", b"PGUP", b"PGDN", b"F1", b"F2", b"F3", b"F4", b"F5", b"F6", b"F7", b"F8", b"F9", b"F10", b"F11", b"F12", ): execBytes = keyID + b" = object()" execStr = execBytes.decode("ascii") exec(execStr)
TAB = b"\t" BACKSPACE = b"\x7f"
width = 80 height = 24
fill = b" " void = object() _log = Logger()
def getCharacter(self, x, y): return self.lines[y][x]
def connectionMade(self): self.reset()
def write(self, data): """ Add the given printable bytes to the terminal.
Line feeds in L{bytes} will be replaced with carriage return / line feed pairs. """ for b in iterbytes(data.replace(b"\n", b"\r\n")): self.insertAtCursor(b)
def _currentFormattingState(self): return _FormattingState(self.activeCharset, **self.graphicRendition)
def insertAtCursor(self, b): """ Add one byte to the terminal at the cursor and make consequent state updates.
If b is a carriage return, move the cursor to the beginning of the current row.
If b is a line feed, move the cursor to the next row or scroll down if the cursor is already in the last row.
Otherwise, if b is printable, put it at the cursor position (inserting or overwriting as dictated by the current mode) and move the cursor. """ if b == b"\r": self.x = 0 elif b == b"\n": self._scrollDown() elif b in string.printable.encode("ascii"): if self.x >= self.width: self.nextLine() ch = (b, self._currentFormattingState()) if self.modes.get(insults.modes.IRM): self.lines[self.y][self.x : self.x] = [ch] self.lines[self.y].pop() else: self.lines[self.y][self.x] = ch self.x += 1
def _emptyLine(self, width): return [(self.void, self._currentFormattingState()) for i in range(width)]
def _scrollDown(self): self.y += 1 if self.y >= self.height: self.y -= 1 del self.lines[0] self.lines.append(self._emptyLine(self.width))
def _scrollUp(self): self.y -= 1 if self.y < 0: self.y = 0 del self.lines[-1] self.lines.insert(0, self._emptyLine(self.width))
def cursorUp(self, n=1): self.y = max(0, self.y - n)
def cursorDown(self, n=1): self.y = min(self.height - 1, self.y + n)
def cursorBackward(self, n=1): self.x = max(0, self.x - n)
def cursorForward(self, n=1): self.x = min(self.width, self.x + n)
def cursorPosition(self, column, line): self.x = column self.y = line
def cursorHome(self): self.x = self.home.x self.y = self.home.y
def index(self): self._scrollDown()
def reverseIndex(self): self._scrollUp()
def nextLine(self): """ Update the cursor position attributes and scroll down if appropriate. """ self.x = 0 self._scrollDown()
def saveCursor(self): self._savedCursor = (self.x, self.y)
def restoreCursor(self): self.x, self.y = self._savedCursor del self._savedCursor
def setModes(self, modes): for m in modes: self.modes[m] = True
def resetModes(self, modes): for m in modes: try: del self.modes[m] except KeyError: pass
def setPrivateModes(self, modes): """ Enable the given modes.
Track which modes have been enabled so that the implementations of other L{insults.ITerminalTransport} methods can be properly implemented to respect these settings.
@see: L{resetPrivateModes} @see: L{insults.ITerminalTransport.setPrivateModes} """ for m in modes: self.privateModes[m] = True
def resetPrivateModes(self, modes): """ Disable the given modes.
@see: L{setPrivateModes} @see: L{insults.ITerminalTransport.resetPrivateModes} """ for m in modes: try: del self.privateModes[m] except KeyError: pass
def applicationKeypadMode(self): self.keypadMode = "app"
def numericKeypadMode(self): self.keypadMode = "num"
def selectCharacterSet(self, charSet, which): self.charsets[which] = charSet
def shiftIn(self): self.activeCharset = insults.G0
def shiftOut(self): self.activeCharset = insults.G1
def singleShift2(self): oldActiveCharset = self.activeCharset self.activeCharset = insults.G2 f = self.insertAtCursor
def insertAtCursor(b): f(b) del self.insertAtCursor self.activeCharset = oldActiveCharset
self.insertAtCursor = insertAtCursor
def singleShift3(self): oldActiveCharset = self.activeCharset self.activeCharset = insults.G3 f = self.insertAtCursor
def insertAtCursor(b): f(b) del self.insertAtCursor self.activeCharset = oldActiveCharset
self.insertAtCursor = insertAtCursor
def selectGraphicRendition(self, *attributes): for a in attributes: if a == insults.NORMAL: self.graphicRendition = { "bold": False, "underline": False, "blink": False, "reverseVideo": False, "foreground": WHITE, "background": BLACK, } elif a == insults.BOLD: self.graphicRendition["bold"] = True elif a == insults.UNDERLINE: self.graphicRendition["underline"] = True elif a == insults.BLINK: self.graphicRendition["blink"] = True elif a == insults.REVERSE_VIDEO: self.graphicRendition["reverseVideo"] = True else: try: v = int(a) except ValueError: self._log.error( "Unknown graphic rendition attribute: {attr!r}", attr=a ) else: if FOREGROUND <= v <= FOREGROUND + N_COLORS: self.graphicRendition["foreground"] = v - FOREGROUND elif BACKGROUND <= v <= BACKGROUND + N_COLORS: self.graphicRendition["background"] = v - BACKGROUND else: self._log.error( "Unknown graphic rendition attribute: {attr!r}", attr=a )
def eraseLine(self): self.lines[self.y] = self._emptyLine(self.width)
def eraseToLineEnd(self): width = self.width - self.x self.lines[self.y][self.x :] = self._emptyLine(width)
def eraseToLineBeginning(self): self.lines[self.y][: self.x + 1] = self._emptyLine(self.x + 1)
def eraseDisplay(self): self.lines = [self._emptyLine(self.width) for i in range(self.height)]
def eraseToDisplayEnd(self): self.eraseToLineEnd() height = self.height - self.y - 1 self.lines[self.y + 1 :] = [self._emptyLine(self.width) for i in range(height)]
def eraseToDisplayBeginning(self): self.eraseToLineBeginning() self.lines[: self.y] = [self._emptyLine(self.width) for i in range(self.y)]
def deleteCharacter(self, n=1): del self.lines[self.y][self.x : self.x + n] self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n)))
def insertLine(self, n=1): self.lines[self.y : self.y] = [self._emptyLine(self.width) for i in range(n)] del self.lines[self.height :]
def deleteLine(self, n=1): del self.lines[self.y : self.y + n] self.lines.extend([self._emptyLine(self.width) for i in range(n)])
def reportCursorPosition(self): return (self.x, self.y)
def reset(self): self.home = insults.Vector(0, 0) self.x = self.y = 0 self.modes = {} self.privateModes = {} self.setPrivateModes( [insults.privateModes.AUTO_WRAP, insults.privateModes.CURSOR_MODE] ) self.numericKeypad = "app" self.activeCharset = insults.G0 self.graphicRendition = { "bold": False, "underline": False, "blink": False, "reverseVideo": False, "foreground": WHITE, "background": BLACK, } self.charsets = { insults.G0: insults.CS_US, insults.G1: insults.CS_US, insults.G2: insults.CS_ALTERNATE, insults.G3: insults.CS_ALTERNATE_SPECIAL, } self.eraseDisplay()
def unhandledControlSequence(self, buf): print("Could not handle", repr(buf))
def __bytes__(self): lines = [] for L in self.lines: buf = [] length = 0 for (ch, attr) in L: if ch is not self.void: buf.append(ch) length = len(buf) else: buf.append(self.fill) lines.append(b"".join(buf[:length])) return b"\n".join(lines)
def getHost(self): # ITransport.getHost raise NotImplementedError("Unimplemented: TerminalBuffer.getHost")
def getPeer(self): # ITransport.getPeer raise NotImplementedError("Unimplemented: TerminalBuffer.getPeer")
def loseConnection(self): # ITransport.loseConnection raise NotImplementedError("Unimplemented: TerminalBuffer.loseConnection")
def writeSequence(self, data): # ITransport.writeSequence raise NotImplementedError("Unimplemented: TerminalBuffer.writeSequence")
def horizontalTabulationSet(self): # ITerminalTransport.horizontalTabulationSet raise NotImplementedError( "Unimplemented: TerminalBuffer.horizontalTabulationSet" )
def tabulationClear(self): # TerminalTransport.tabulationClear raise NotImplementedError("Unimplemented: TerminalBuffer.tabulationClear")
def tabulationClearAll(self): # TerminalTransport.tabulationClearAll raise NotImplementedError("Unimplemented: TerminalBuffer.tabulationClearAll")
def doubleHeightLine(self, top=True): # ITerminalTransport.doubleHeightLine raise NotImplementedError("Unimplemented: TerminalBuffer.doubleHeightLine")
def singleWidthLine(self): # ITerminalTransport.singleWidthLine raise NotImplementedError("Unimplemented: TerminalBuffer.singleWidthLine")
def doubleWidthLine(self): # ITerminalTransport.doubleWidthLine raise NotImplementedError("Unimplemented: TerminalBuffer.doubleWidthLine")
class ExpectationTimeout(Exception): pass
class ExpectableBuffer(TerminalBuffer): _mark = 0
def connectionMade(self): TerminalBuffer.connectionMade(self) self._expecting = []
def write(self, data): TerminalBuffer.write(self, data) self._checkExpected()
def cursorHome(self): TerminalBuffer.cursorHome(self) self._mark = 0
def _timeoutExpected(self, d): d.errback(ExpectationTimeout()) self._checkExpected()
def _checkExpected(self): s = self.__bytes__()[self._mark :] while self._expecting: expr, timer, deferred = self._expecting[0] if timer and not timer.active(): del self._expecting[0] continue for match in expr.finditer(s): if timer: timer.cancel() del self._expecting[0] self._mark += match.end() s = s[match.end() :] deferred.callback(match) break else: return
def expect(self, expression, timeout=None, scheduler=reactor): d = defer.Deferred() timer = None if timeout: timer = scheduler.callLater(timeout, self._timeoutExpected, d) self._expecting.append((re.compile(expression), timer, d)) self._checkExpected() return d
__all__ = ["CharacterAttribute", "TerminalBuffer", "ExpectableBuffer"]
|