Kombu 里面使用 Transport 类来表示一个具体的消息代理(Broker),目前包含 Redis、MongoDB、Zookeeper、Django、SQLAlchemy 等类型。这种对不同类型实现相同接口的需求要求我们要设计成可扩展的方式。

我之前写代码,习惯这么设计:

  1. 写一个基类 Transport,定义还未实现的那些接口。
  2. 继承这个基类,实现对应的接口。
  3. 调用的时候通过一个带有别名和对应类的字典找到这个类。

如果新加一种类型,就是实现这个类型的 Transport,然后在对应关系的映射里面加在它。

Kombu 实现的更深入一些。今天我们分析下它是怎么实现的。

首先 Kombu 也有一个基类 Transport

from kombu.transport import base


class Channel(AbstractChannel, base.StdChannel):
    # 队列和消息处理的逻辑
    ...

class Transport(base.Transport):
    Channel = Channel
    implements = base.Transport.implements.extend(
        exchange_type=frozenset(['direct', 'topic']),
        ...
    ) 
    ...

这个 base.Transport 相当于预先定义了一些接口,相当于更加「基类」,这就不看了,权当这个 Transport 是各种消息代理的基类吧。

我们看一下 MongoDB 类型的实现:

from . import virtual


class Channel(virtual.Channel):
    # MongoDB的逻辑


class Transport(virtual.Transport):
    Channel = Channel  # 在这里被替换成MongoDB的Channel逻辑了
    driver_type = 'mongodb'
    driver_name = 'pymongo'

    implements = virtual.Transport.implements.extend(
        exchange_type=frozenset(['direct', 'topic', 'fanout']),
    )  # 不同消息代理能实现的交换类型是有区别的
    ...

套路来了:每种 Transport 使用了完全不同的 Channel,其他需要不一样处理的地方也会被覆写。

看起来和我上面说的方式也没什么不同嘛?重点来了,看它怎么用的,首先我们先了解 2 个函数:

  1. symbol_by_name 函数可以通过字符串转化成对应的类对象:

    >>> symbol_by_name('celery.concurrency.processes.TaskPool')
     <class 'celery.concurrency.processes.TaskPool'>
    

    它和 werkzeug.utils.import_string 的作用差不多,但是更符合业务需要。

  2. fmatch_best 函数是用来模糊匹配的,如果你不小心输错了他可以基于现有资源告诉你最符合的那个:

```python
>>> fmatch_best('hello', ['xxx', 'hell', 'hea'])
'hell'
```

有兴趣的可以研究kombu的实现。

回答正题看看它怎么实现的:

TRANSPORT_ALIASES = {
    'amqp': 'kombu.transport.pyamqp:Transport',
    'memory': 'kombu.transport.memory:Transport',
    'redis': 'kombu.transport.redis:Transport',
    'mongodb': 'kombu.transport.mongodb:Transport',
    ...
}

_transport_cache = {}


def resolve_transport(transport=None):
    try:
        transport = TRANSPORT_ALIASES[transport]
    except KeyError:
        if '.' not in transport and ':' not in transport:
            from kombu.utils.text import fmatch_best
            alt = fmatch_best(transport, TRANSPORT_ALIASES)
            if alt:
                raise KeyError(
                    'No such transport: {0}.  Did you mean {1}?'.format(
                        transport, alt))
            raise KeyError('No such transport: {0}'.format(transport))
    else:
        if callable(transport):
            transport = transport()
    return symbol_by_name(transport)


def get_transport_cls(transport=None):
    if transport not in _transport_cache:
        _transport_cache[transport] = resolve_transport(transport)
    return _transport_cache[transport]

使用一下:

In [6]: from kombu.transport import get_transport_cls

In [7]: get_transport_cls('mongodb')
Out[7]: kombu.transport.mongodb.Transport

它比较好的设计有 2 个:

  1. 使用了缓存。Kombu 把获取的对应关系存在了_transport_cache,但是你不去获取它什么都不会做。
  2. 竟然比较好的支持了模糊匹配!!ↁ_ↁ
  3. 通过字符串获得对应的类对象的实现非常智能,不用在__init__里面把所有类型 import 进来在 alias 一下,否则就要这样了:
from .mongodb import Transport as MongodbTransport
from .redis import Transport as RedisTransport
...

TRANSPORT_ALIASES = {
    'redis': RedisTransport,
    'mongodb': MongodbTransport,
    ...
}

唯一我觉得可以简化的地方是 TRANSPORT_ALIASES 中的对应关系,因为大部分情况下,键的名字和类型文件的名字是对应的,比如「'redis': 'kombu.transport.redis:Transport'」中的 redis 其实就一种命令规则,除了一些 amqp 类型的对应关系外,我们显然可以通过 XX 直接尝试去获取kombu.transport.XX:Transport类。但是 kombu 为啥没有省着差不多 10 行的代码呢?

这涉及到了代码可读性可维护性的问题,没有必要为了极简的代码量增加复杂度。