Software Development/Python

[Paramiko] SFTP open readline 사용시 encoding 에러

루ㅌ 2020. 2. 7. 09:21

Paramiko로 Python에서 SFTP를 사용할 수 있습니다.

 

원격지에 있는 컴퓨터와 프로그램이 실행되는 컴퓨터가 OS가 다르거나, 문자열 encoding 방식이 다를 때 UnicodeDecodeError를 볼 수 있습니다. 

 

Paramiko를 사용해서 원격지에 있는 txt 파일을 open의 readline을 이용하여 한 줄씩 읽으려는데 UnicodeDecodeError가 발생했습니다.

 

readline의 함수에 들어가는 파라미터가 없는데

Paramiko가 있는 패키지에서 해당 파이썬 파일을 아래와 같이 수정하면

 

py3compat.py    

def u()를 

    def u(s, encoding="utf8"):  # NOQA
        """cast bytes or unicode to unicode"""
        if isinstance(s, str):
            return s.decode(encoding)
        elif isinstance(s, unicode):  # NOQA
            return s
        elif isinstance(s, buffer):  # NOQA
            return s.decode(encoding)
        else:
            raise TypeError("Expected unicode or bytes, got {!r}".format(s))

와 같이 변경하고 #encoding이라는 매개변수를 추가해서 "utf8"이라고 하드코딩된 부분을 교체했습니다.

 

file.py

def readline(self)를 

    def readline(self, size=None, encoding="utf8"):
        """
        Read one entire line from the file.  A trailing newline character is
        kept in the string (but may be absent when a file ends with an
        incomplete line).  If the size argument is present and non-negative, it
        is a maximum byte count (including the trailing newline) and an
        incomplete line may be returned.  An empty string is returned only when
        EOF is encountered immediately.

        .. note::
            Unlike stdio's ``fgets``, the returned string contains null
            characters (``'\\0'``) if they occurred in the input.

        :param int size: maximum length of returned string.
        :returns:
            next line of the file, or an empty string if the end of the
            file has been reached.

            If the file was opened in binary (``'b'``) mode: bytes are returned
            Else: the encoding of the file is assumed to be UTF-8 and character
            strings (`str`) are returned
        """
        # it's almost silly how complex this function is.
        if self._closed:
            raise IOError("File is closed")
        if not (self._flags & self.FLAG_READ):
            raise IOError("File not open for reading")
        line = self._rbuffer
        truncated = False
        while True:
            if (
                self._at_trailing_cr
                and self._flags & self.FLAG_UNIVERSAL_NEWLINE
                and len(line) > 0
            ):
                # edge case: the newline may be '\r\n' and we may have read
                # only the first '\r' last time.
                if line[0] == linefeed_byte_value:
                    line = line[1:]
                    self._record_newline(crlf)
                else:
                    self._record_newline(cr_byte)
                self._at_trailing_cr = False
            # check size before looking for a linefeed, in case we already have
            # enough.
            if (size is not None) and (size >= 0):
                if len(line) >= size:
                    # truncate line
                    self._rbuffer = line[size:]
                    line = line[:size]
                    truncated = True
                    break
                n = size - len(line)
            else:
                n = self._bufsize
            if linefeed_byte in line or (
                self._flags & self.FLAG_UNIVERSAL_NEWLINE and cr_byte in line
            ):
                break
            try:
                new_data = self._read(n)
            except EOFError:
                new_data = None
            if (new_data is None) or (len(new_data) == 0):
                self._rbuffer = bytes()
                self._pos += len(line)
                return line if self._flags & self.FLAG_BINARY else u(line)
            line += new_data
            self._realpos += len(new_data)
        # find the newline
        pos = line.find(linefeed_byte)
        if self._flags & self.FLAG_UNIVERSAL_NEWLINE:
            rpos = line.find(cr_byte)
            if (rpos >= 0) and (rpos < pos or pos < 0):
                pos = rpos
        if pos == -1:
            # we couldn't find a newline in the truncated string, return it
            self._pos += len(line)
            return line if self._flags & self.FLAG_BINARY else u(line)
        xpos = pos + 1
        if (
            line[pos] == cr_byte_value
            and xpos < len(line)
            and line[xpos] == linefeed_byte_value
        ):
            xpos += 1
        # if the string was truncated, _rbuffer needs to have the string after
        # the newline character plus the truncated part of the line we stored
        # earlier in _rbuffer
        if truncated:
            self._rbuffer = line[xpos:] + self._rbuffer
        else:
            self._rbuffer = line[xpos:]

        lf = line[pos:xpos]
        line = line[:pos] + linefeed_byte
        if (len(self._rbuffer) == 0) and (lf == cr_byte):
            # we could read the line up to a '\r' and there could still be a
            # '\n' following that we read next time.  note that and eat it.
            self._at_trailing_cr = True
        else:
            self._record_newline(lf)
        self._pos += len(line)
        return line if self._flags & self.FLAG_BINARY else u(line, encoding=encoding)

와 같이 변경하면 #위에서 변경한 u()함수에 매개변수를 전달하기 위한 부분만 수정했습니다.

 

"readline(encoding = 'euc-kr')"와 같이 원하는 encoding 값을 전달하면 

readline함수 사용시 발생할 수 있는 encoding 오류를 해결할 수 있습니다.