相信很多使用 Python 的同学熟悉编解码,比如:

In : print u'\U0001F3F4'.encode('utf-8')
🏴
In : '哈哈'.decode('utf8')
Out: u'\u54c8\u54c8'

这样可以在字符串和 unicode 之前转换,不过细心的同学可能发现了,我使用了「utf-8」和「utf8」,这 2 个词长得很像。事实上都能正常使用是由于他们都是「utf_8」的别名,这些别名的对应关系可以用如下方法找到:

In : import encodings

In : encodings.aliases.aliases['utf8']
Out: 'utf_8'

In : '哈哈'.decode('u8')
Out: u'\u54c8\u54c8'

In : '哈哈'.decode('utf')
Out: u'\u54c8\u54c8'

In : '哈哈'.decode('utf8_ucs2')
Out: u'\u54c8\u54c8'

In : '哈哈'.decode('utf8_ucs4')
Out: u'\u54c8\u54c8'

encodings 是标准库中自带的编码库,其中包含了上百个标准的编码转换方案。但是通常并不需要使用 encodings,而是使用 codecs。

codecs 包含了编码解码器的注册和其他基本的类,开发者还可以通过 codecs 提供的接口自定义编 / 解码方案,也就是可以创造一个新的编解码转换方案,使用 encode ('XX') 和 decode ('XX') 的方式使用。今天我给大家演示直接进行 Fernet 对称加密的例子。

互联网安全的重要性不必在复述了,大家都应该接触过一些加密技术,可能听过 M2Crypto、PyCrypto、Cryptography 之类的库。在这里歪个楼,现在的主流是使用 Cryptography,它的出现就是为了替代之前的那些库,具体的可以看官方文档。我们使用 Cryptography 提供的 Fernet 类来实现,首先实现一个 Codec 类:

import codecs

from cryptography.fernet import Fernet
from cryptography.fernet import InvalidToken

KEY = Fernet.generate_key()
f = Fernet(KEY)


class FernetCodec(codecs.Codec):
    def encode(self, input, errors='fernet.strict'):
        return f.encrypt(input), len(input)

    def decode(self, input, errors='fernet.strict'):
        try:
            return f.decrypt(input), len(input)
        except InvalidToken:
            error = codecs.lookup_error(errors)
            return error(UnicodeDecodeError('fernet', input, 0, len(input) + 1,
                                            'Invalid Token'))

当然也不必在类中实现 encode 和 decode 方法,单独的 2 个函数也可以。我这里是为了给之后的演示到的类复用。

如果你看过内置的字符串 encode 的方法,它的文档说还接收第二个参数,让你告诉它当出现出错的时候如何去处理,默认是 strict,直接就会抛出来错误。 其余可选的还有 ignore、replace、xmlcharrefreplace:

In [35]: '\x80abc'.decode('utf-8', 'strict')
---------------------------------------------------------------------------
UnicodeDecodeError                        Traceback (most recent call last)
<ipython-input-35-974ebc908d50> in <module>()
----> 1 '\x80abc'.decode('utf-8', 'strict')

/Users/dongweiming/dae/venv/lib/python2.7/encodings/utf_8.pyc in decode(input, errors)
     14
     15 def decode(input, errors='strict'):
---> 16     return codecs.utf_8_decode(input, errors, True)
     17
     18 class IncrementalEncoder(codecs.IncrementalEncoder):

UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: invalid start byte

In [36]: '\x80abc'.decode('utf-8', 'replace')
Out[36]: u'\uffabc'

In [37]: '\x80abc'.decode('utf-8', 'ignore')
Out[37]: u'abc'

事实上 Python 还内置了其他的选项,如 backslashreplace、namereplace、surrogatepass、surrogateescape 等,有兴趣的可以看 源码实现 .

我也会定义 2 种错误函数,因为在解密(执行 decrypt)的时候可能会报 InvalidToken 错误,但是 InvalidToken 不包含任何参数,而对错误处理的时候需要知道起始和结束的位置,所以我就直接抛一个 UnicodeDecodeError 错误了。

接着我们定义递增式和流式的编码类:

class IncrementalEncoder(codecs.IncrementalEncoder, FernetCodec):
    pass


class IncrementalDecoder(codecs.IncrementalDecoder, FernetCodec):
    pass


class StreamReader(FernetCodec, codecs.StreamReader):
    pass


class StreamWriter(FernetCodec, codecs.StreamWriter):
    pass

由于这个要加密的字符串不会很长,没有必要实现这些类,我就简单的继承然后 pass 了。接着开开放入口:

def getregentry():
    return codecs.CodecInfo(
        name='fernet',
        encode=FernetCodec().encode,
        decode=FernetCodec().decode,
        incrementalencoder=IncrementalEncoder,
        incrementaldecoder=IncrementalDecoder,
        streamwriter=StreamWriter,
        streamreader=StreamReader,
    )

其实 incrementalencoder、streamwriter 这些参数不传递也是可以的,默认是 None,今天只是为了让大家知道是有这部分接口的。然后是注册这个入口,为了提供更好的性能,我创建一个函数加上缓存功能:

_cache = {}
_unknown = '--unknown--'

def search_function(encoding):
    import encodings
    encoding = encodings.normalize_encoding(encoding)
    entry = _cache.get(encoding, _unknown)
    if entry is not _unknown:
        return entry

    if encoding == 'fernet':
        entry = getregentry()
        _cache[encoding] = entry
        return entry


codecs.register(search_function)

这里简单介绍下 normalize_encoding 的意义,之前我说到了别名,我们再感受下:

In : u'哈哈'.encode('utf-8')
Out: '\xe5\x93\x88\xe5\x93\x88'

In : u'哈哈'.encode('utf_8')
Out: '\xe5\x93\x88\xe5\x93\x88'

这种中划线和下划线最后无差别对待,就是通过这个函数标准化的。

codecs 模块底层维护了一个搜索函数的列表,通过调用 codecs.register 方法就把上述函数 append 进去了。

最后我们注册 2 个错误处理的函数:

def strict_errors(exc):
    if isinstance(exc, UnicodeDecodeError):
        raise TypeError('Invalid Token')


def fallback_errors(exc):
    s = []
    for c in exc.object[exc.start:exc.end]:  # 只是为了演示提供的接口,实施上就是返回原来的input
        s.append(c)
    return ''.join(s), exc.end

codecs.register_error('fernet.strict', strict_errors)
codecs.register_error('fernet.fallback', fallback_errors)

默认使用的是 fernet.strict 这种处理方案,也可以使用 fallback 模式。ok,我们现在感受一下:

In [1]: import fernet

In [2]: input = 'hah'

In [3]: output = input.encode('fernet')

In [4]: output  # 已经是加密后的结果了
Out[4]: 'gAAAAABZCFa6Znp2a_e9O0VqP6qToO6T3xRbF7O-adtpFC4QYO7jvVc6Yrcwbo6YGQfL8g5HCXcsaan_THWNhjZAorPTwlQQTA=='

In [5]: output.decode('fernet')  # 解密后还原成原来的字符串
Out[5]: 'hah'

In [6]: input.decode('fernet')
'fernet' codec can't decode bytes in position 0-3: Invalid Token
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-6-1c20dc246c52> in <module>()
----> 1 input.decode('fernet')

/Users/dongweiming/test/tmp/fernet.pyc in decode(self, input, errors)
     20             error = codecs.lookup_error(errors)
     21             return error(UnicodeDecodeError('fernet', input, 0, len(input) + 1,
---> 22                                             'Invalid Token'))
     23
     24

/Users/dongweiming/test/tmp/fernet.pyc in strict_errors(exc)
     68     print exc
     69     if isinstance(exc, UnicodeDecodeError):
---> 70         raise TypeError('Invalid Token')
     71
     72

TypeError: Invalid Token  # 抛了个自定义错误

In [7]: input.decode('fernet', 'fernet.fallback')
Out[7]: 'hah'  # 解密不成功,返回原来的字符串

好了,自定义的加解密方案完成了,整理全部的代码如下:

import codecs

from cryptography.fernet import Fernet
from cryptography.fernet import InvalidToken

KEY = Fernet.generate_key()
f = Fernet(KEY)
_cache = {}
_unknown = '--unknown--'


class FernetCodec(codecs.Codec):
    def encode(self, input, errors='fernet.strict'):
        return f.encrypt(input), len(input)

    def decode(self, input, errors='fernet.strict'):
        try:
            return f.decrypt(input), len(input)
        except InvalidToken:
            error = codecs.lookup_error(errors)
            return error(UnicodeDecodeError('fernet', input, 0, len(input) + 1,
                                            'Invalid Token'))


class IncrementalEncoder(codecs.IncrementalEncoder, FernetCodec):
    pass


class IncrementalDecoder(codecs.IncrementalDecoder, FernetCodec):
    pass


class StreamReader(FernetCodec, codecs.StreamReader):
    pass


class StreamWriter(FernetCodec, codecs.StreamWriter):
    pass



def getregentry():
    return codecs.CodecInfo(
        name='fernet',
        encode=FernetCodec().encode,
        decode=FernetCodec().decode,
        incrementalencoder=IncrementalEncoder,
        incrementaldecoder=IncrementalDecoder,
        streamwriter=StreamWriter,
        streamreader=StreamReader,
    )


def search_function(encoding):
    import encodings
    encoding = encodings.normalize_encoding(encoding)
    entry = _cache.get(encoding, _unknown)
    if entry is not _unknown:
        return entry

    if encoding == 'fernet':
        entry = getregentry()
        _cache[encoding] = entry
        return entry


def strict_errors(exc):
    if isinstance(exc, UnicodeDecodeError):
        raise TypeError('Invalid Token')


def fallback_errors(exc):
    s = []
    for c in exc.object[exc.start:exc.end]:
        s.append(c)
    return ''.join(s), exc.end


codecs.register(search_function)
codecs.register_error('fernet.strict', strict_errors)
codecs.register_error('fernet.fallback', fallback_errors)