9234ff6dcdb5b96ce11454dca898ffb5
Shadowsocks Probe II - TCP 代理过程

Shadowsocks Probe II - TCP 代理过程

(续 Shadowsocks Probe I - Socks5 与 EventLoop 事件分发

TCPRelay 及 Socket 监听事件

前文我们了解了 Shadowsocks 通过 EventLoop 的循环,整体构建了一个 Reactor 模式来将时间逐层传递给 EventHandler,然后在进行 I/O 处理。

我们再次查看 handle_event 的代码进行跟踪:

def handle_event(self, sock, fd, event):
   # 处理 events 并派发到指定的 handlers
   if sock:
       logging.log(shell.VERBOSE_LEVEL, 'fd %d %s', fd,
                   eventloop.EVENT_NAMES.get(event, event))
   # 如果是 TCPRelay 的 socket
   # 这时候说明有 TCP 连接,创建 TCPRelayHandler 并封装
   if sock == self._server_socket:
       if event & eventloop.POLL_ERR:
           # TODO
           raise Exception('server_socket error')
       try:
           logging.debug('accept')
           # 接受连接
           conn = self._server_socket.accept()
           # 完成 handler 封装
           TCPRelayHandler(self, self._fd_to_handlers,
                           self._eventloop, conn[0], self._config,
                           self._dns_resolver, self._is_local)
       except (OSError, IOError) as e:
           error_no = eventloop.errno_from_exception(e)
           if error_no in (errno.EAGAIN, errno.EINPROGRESS,
                           errno.EWOULDBLOCK):
               return
           else:
               shell.print_exception(e)
               if self._config['verbose']:
                   traceback.print_exc()
   else:
       if sock:
           # 找到 fd 对应的 TCPRelayHandler
           handler = self._fd_to_handlers.get(fd, None)
           if handler:
               # 启用 handler 来处理读写事件
               handler.handle_event(sock, event)
       else:
           logging.warn('poll removed fd')

其实在源码中,作者对客户端链接建立的完全过程做了注释:

# for each opening port, we have a TCP Relay
# 对于每一个开启的端口,都会拥有一个 TCPRelay

# for each connection, we have a TCP Relay Handler to handle the connection
# 对于每一个连接请求,都有一个 TCPRelayHandler 来持有

# for each handler, we have 2 sockets:
#    local:   connected to the client
#    remote:  connected to remote server
# 对于每一个 handler,有两种 Sockets
#     local: 用于链接到客户端
#       remote: 用于链接到远程服务器

这里解释了每个对象的作用。其中,TCPRelay_server_socket 来记录监听端口的 Socket,_listen_port_listen_addr 分别代表监听的端口和地址。另外还有很多属性来记录各种状态,例如 _config 来记录配置,_is_local 记录是否为本地客户端请求,_fd_to_handler 持有对应的 handler 的映射关系。

如果产生 Event 的 Socket 是 TCPRelay 本身,那么 accept() 之后会建立一个新的 TCP 连接,并创建 TCPRelayHandler 对象来负责处理,这种事件用于 Client 端和 Proxy 端的连接和传递;否则说明 Event 的 Socket 为 TCPRelayHandler 的,这时候 fd 会检索到相关 handler 并调用 handle_event 来处理 Event。

在继续阅读之前,我们先来回顾 Event 在 epoll 的操作过程中可能出现的一些状态,以 C 代码中的宏名称为标准:

EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

eventloop.py 中我们找到了这些对应状态:

POLL_NULL = 0x00
POLL_IN = 0x01
POLL_OUT = 0x04
POLL_ERR = 0x08
POLL_HUP = 0x10
POLL_NVAL = 0x20

EVENT_NAMES = {
    POLL_NULL: 'POLL_NULL',     # 事件为空
    POLL_IN: 'POLL_IN',            # 有数据可读
    POLL_OUT: 'POLL_OUT',        # 写数据不会导致阻塞
    POLL_ERR: 'POLL_ERR',        # 指定的文件描述符发生错误(revents 域)
    POLL_HUP: 'POLL_HUP',        # 指定的文件描述符挂起事件
    POLL_NVAL: 'POLL_NVAL',        # 指定的文件描述符非法
}

TCPRelay 在循环中拿到的拿到的 Event 只有状态为 POLL_IN 才能为其分配指定的 handle。而在分配之前,前面的那个代码片段仅仅是判断了 POLL_ERR 这种 fd 发生错误的状况。于是去查看一下 Event 投放的入口 add_to_loop 方法:

def add_to_loop(self, loop):
    if self._eventloop:
        raise Exception('already add to loop')
    if self._closed:
        raise Exception('already closed')
    self._eventloop = loop
    # 仅仅向 loop 中投递两种状态的 event
    self._eventloop.add(self._server_socket,
                        eventloop.POLL_IN | eventloop.POLL_ERR, self)
    # 为循环增加周期
    self._eventloop.add_periodic(self.handle_periodic)

add_to_loop 中仅注册了可读事件,所以不会出现其他的情况。

在前文中,我们说一个 TCPRelay 代表的其实就是一个普通的 TCP 服务器,其中持有了所监听的端口(_listen_port)和 IP 地址(_listen_addr)。但是我们在使用 Shadowsocks 时,在 PAC 过滤文件中仅仅用域名字符串数组的形式来存储,并无法得知其 IP,那我们的 TCPRelay 如何对事件对指定的地址进行转发呢?我们可以在 __init__ 构造方法的代码中发现答案:

addrs = socket.getaddrinfo(listen_addr, listen_port, 0,
                           socket.SOCK_STREAM, socket.SOL_TCP)
af, socktype, proto, canonname, sa = addrs[0]
server_socket = socket.socket(af, socktype, proto)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(sa)

不禁要问 getaddrinfo 方法是什么,问什么能拿到这么多信息?我们用 iPython 直接调试一下这个方法:

可以发现 getaddrinfo 可以对域名做地址解析。并且通过文档查询该方法会将主机参数转换为包含用于创建连接到该服务的 Socket 的所有必须参数的 5 元祖集合。其函数范式如下:

socket.getaddrinfo(host, port[, family[, socktype[, proto[, flags]]]])

> return (family, socktype, proto, canonname, sockaddr)

由此可以看出,这个方法可以获取到主机的全部信息,是 TCPRelay 成功监听的关键之一。

TCPRelay.description

TCPRelay.description

TCPRelayHandler

TCPRelay 转发给 TCPRelayHandler 之后,handle_event 会对应选择处理的事件。它会再进一步,根据事件的种类,调用相应的函数来做处理,这也是一个转发的过程。注意,这里的 handle_event 是由 TCPRelayHandler 实例的方法属性,而不是上文中出现的 TCPRelay 中的属性。在这个方法中,会追踪 Socket 来决策调用方法。

在看源码之前,我们先来了解一下 stage 过程是如何定义的(还是在 tcprelay.py 中):

# as ssserver:
# stage 0 初始状态,SOCKS5 握手,仅仅需要跳入 STAGE_ADDR 状态即可
# stage 1 SOCKS5 建立连接阶段,从本地获取地址(addr),DNS 进行解析
# stage 3 DNS 解析后连接到远端
# stage 4 从本地获取数据,保持连接
# stage 5 建立管道(pipe),SOCK5 传输

STAGE_INIT = 0
STAGE_ADDR = 1
STAGE_UDP_ASSOC = 2
STAGE_DNS = 3
STAGE_CONNECTING = 4
STAGE_STREAM = 5
STAGE_DESTROYED = -1

了解了各个 stage 过程的定义,我们来看下 handle_event 是如何根据 stage 过程种类来调用相应的函数的。这个过程也就是我们了解的转发过程

def handle_event(self, sock, event):
    # 处理所有事件并派发给指定的方法
    if self._stage == STAGE_DESTROYED:
        logging.debug('ignore handle_event: destroyed')
        return
    # 处理顺序很重要,优先级相关
    # 处理 self._romte_sock Socket 
    if sock == self._remote_sock:
        # 文件描述符错误,直接进入 STAGE_DESTROYED 阶段
        if event & eventloop.POLL_ERR:
            self._on_remote_error()
            if self._stage == STAGE_DESTROYED:
                return
        # 有数据可读或已经被挂起,进行读数据事件
        if event & (eventloop.POLL_IN | eventloop.POLL_HUP):
            self._on_remote_read()
            # 如果已经进入 STAGE_DESTROYED 结束处理
            if self._stage == STAGE_DESTROYED:
                return
        # 如果需要写数据,进行写数据事件
        if event & eventloop.POLL_OUT:
            self._on_remote_write()
    # 处理 self._local_sock Socket
    elif sock == self._local_sock:
        if event & eventloop.POLL_ERR:
            self._on_local_error()
            if self._stage == STAGE_DESTROYED:
                return
        if event & (eventloop.POLL_IN | eventloop.POLL_HUP):
            self._on_local_read()
            if self._stage == STAGE_DESTROYED:
                return
        if event & eventloop.POLL_OUT:
            self._on_local_write()
    else:
        # 未知套接字直接打印 log
        logging.warn('unknown socket')

以上代码中,handle_event 会根据事件发生的 Socket 决定调用方法,我们发现 self._remote_sockself._local_sock 两个套接字,其执行方法是对称的。这两个套接字的作用分别是什么作用呢?用一张图来描述一下整个的约定:

agreement_ne

agreement_ne

这是上一篇文章中一张插图的修改,其中黑色圆点代表 local_sock,红色圆点代表 remote_sock。由于 Shadowsocks 源码是客户端和服务端复用的代码,所以对于 sslocalssserver 而言其 Socket 表示的含义不同:

  • sslocal 而言:local_sock 指的是 SOCK5 客户端,remote_sock 指的是 ssserver
  • ssserver 而言:local_sock 指的是 sslocalremote_sock 指的是目标服务器。

我们可以按照图示位置(想对与当前节点而言),将设备分为左端和右端。抽象出来 local_sock 是与左端设备通信的 Socket,而 remote_sock 是与右端设备通信的 Socket。这样我们就能发现其对称性,公用一套相同逻辑的代码。

事件投递细节

之前提及到的 handle_event 方法,当与左端通信的时候,会涉及到三个方法,并对应如下几种事件来定制处理,详细来看一下:

  • POLL_ERRself._on_local_error() 错误处理。
  • POLL_INPOLL_HUPself._on_local_read() 读事件。
  • POLL_OUTself._on_local_write() 写事件。

_on_local_read 方法

def _on_local_read(self):
    # 处理所有的本地读事件并派发给不同 stage 处理方法
    # 判断非 sslocal 状态,预防判断
    if not self._local_sock:
        return
    is_local = self._is_local
    data = None
    # 如果是 sslocal 则用上端极值
    if is_local:
        buf_size = UP_STREAM_BUF_SIZE
    else:
        buf_size = DOWN_STREAM_BUF_SIZE
    # 接受至多 buf_size 大小的数据
    try:
        data = self._local_sock.recv(buf_size)
    # 异常处理,分成两种情况
    #   1. 如果异常原因为 ETIMEOUT, EAGAIN, EWOULDBLOCK 直接 return
    #   2. 否则直接销毁当前的 TCPRelayHandler
    except (OSError, IOError) as e:
        if eventloop.errno_from_exception(e) in \
                (errno.ETIMEDOUT, errno.EAGAIN, errno.EWOULDBLOCK):
            return
    if not data:
        self.destroy()
        return
    # 计时器重置
    self._update_activity(len(data))

    # 如果 data 是 sslocal 发送来的,直接解密数据
    if not is_local:
        data = self._cryptor.decrypt(data)
        # 判断数据解析后是否合法
        if not data:
            return

    # 多状态分类讨论,即状态机分类
    if self._stage == STAGE_STREAM:
        self._handle_stage_stream(data)
        return
    elif is_local and self._stage == STAGE_INIT:
        # jump over socks5 init
        if self._is_tunnel:
            self._handle_stage_addr(data)
            return
        else:
            self._handle_stage_init(data)
    elif self._stage == STAGE_CONNECTING:
        self._handle_stage_connecting(data)
    elif (is_local and self._stage == STAGE_ADDR) or \
            (not is_local and self._stage == STAGE_INIT):
        self._handle_stage_addr(data)

该方法从 local_sock 中读取数据,并通过状态来派发给不同的方法进行处理。在 _on_local_read 的实现中可以看出状态机做出了以下方法映射:

  • STAGE_INITself._handle_stage_init
  • STAGE_ADDRself._handle_stage_addr
  • STAGE_CONNECTINGself._handle_stage_connecting
  • STAGE_STREAMself_handle_stage_stream

而其他的状态仅仅代表一个中间过程量标记,而不导致任何实质性的改变。当然这些状态对于 TCPRelayHandler 的状态划分是十分重要的。既然我们已经确定 TCPRelayHandler 是一个状态机,那么我们将 sslocalssserver 的状态转移列举出来,从而更清晰的把握整体行为驱动:

STAGE

STAGE

ssserver 的状态流程在上述代码中也能找到一些差异:

elif (is_local and self._stage == STAGE_ADDR) or \
       (not is_local and self._stage == STAGE_INIT):
   self._handle_stage_addr(data)

not is_local 对应的是从 sslocal 发送来的数据,所以 ssserver 真正的起始状态是 STAGE_ADDR,这也验证了上一篇文的一个观点:ssserver 只起到中继作用,负责解密以后将数据转发给目标服务器,并不涉及 SOCK5 协议中任何一部分。而在 STAGE_INIT 阶段主要的工作是SOCKS5 握手,所以 ssserver 没有这个流程也是理所当然的。

_on_local_error 方法

```python

嗅探到是错误,写日志直接销毁

def _on_local_error(self):

top Created with Sketch.