读源码-Gunicorn篇-2-启动

本节说明

上一节我们已经可以将服务跑起来了,但是服务器是怎么跑起来的?

本章节我们就来跟踪一下 gunicorn 的启动流程,以及它的进程是如何管理的。

入口

我们知道,一个简单的 gunicorn 服务启动命令,可以在命令行直接输入 gunicorn main:app,那么这个 gunicorn 命令是从哪里来的呢?

gunicorn/pyproject.toml 文件中,有这样一个配置段,[project.scripts]

1
2
3
[project.scripts]
# duplicates "python -m gunicorn" handling in __main__.py
gunicorn = "gunicorn.app.wsgiapp:run"

gunicorn = "gunicorn.app.wsgiapp:run",这个配置就是 gunicorn 程序启动的入口,我们在 pip install gunicorn时,会根据这个配置,在 env/bin 目录生成 gunicorn 的启动脚本,里面的内容是这样的:

1
2
3
4
5
6
7
8
#!/Users/chundi/explorer/env/bin/python
# -*- coding: utf-8 -*-
import re
import sys
from gunicorn.app.wsgiapp import run
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(run())

可以看到也是通过 gunicorn.app.wsgiapp:run 启动的,这个 run 方法就是程序的入口。

调试

我们已经找到了程序的入口,上一节我们也配置好了调试环境,只需要在想要停止的地方打上断点,点击调试就可以开始了!

启动

run 方法打上断点,从入口一步步跟下去,看服务是如何跑起来的。

gunicorn.app.wsgiapp:run
1
2
3
def run(prog=None):
from gunicorn.app.wsgiapp import WSGIApplication
WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]", prog=prog).run()

run 方法中可以看到,它先生成了一个 WSGIApplication 的对象,然后调用了这个对象的 run 方法,那么这个 WSGIApplication 又是什么呢?

classDiagram
    direction LR
    class BaseApplication {
        - usage
        - cfg
        - callable
        - prog
        - logger
        + \__init__(usage, prog)
        + do_load_config()
        + load_default_config()
        + init(parser, opts, args) NotImpl
        + load() NotImpl
        + load_config() NotImpl
        + reload()
        + wsgi()
        + run()
    }

    class Application {
        ...
        + chdir()
        + get_config_from_filename(filename)
        + get_config_from_module_name(module_name)
        + load_config_from_module_name_or_filename(location)
        + load_config_from_file(filename)
        + load_config()
        + run()
    }

    class WSGIApplication {
        - app_uri
        + init(parser, opts, args)
        + load_config()
        + load_wsgiapp()
        + load_pasteapp()
        + load()
    }

    BaseApplication <|-- Application
    Application <|-- WSGIApplication

从继承关系上来看,WSGIApplication 其实是 ApplicationBaseApplication 的子类,BaseApplicationgunicorn 的基本程序接口,提供了配置处理相关的业务逻辑。这几个类的实现是一个典型的 模板方法 设计模式。

因为 WSGIApplication 类没有提供 __init__ 方法,所以在生成 WSGIApplication 对象时会调用父类 BaseApplication__init__ 方法:

BaseApplication:__init__
1
2
3
4
5
6
7
def __init__(self, usage=None, prog=None):
self.usage = usage
self.cfg = None
self.callable = None
self.prog = prog
self.logger = None
self.do_load_config()

可以看到它初始化了内部属性后,执行了加载配置的代码,之后就返回了。加载配置的代码比较繁琐,后续会详细研究,这里先略过。接着回到服务入口那里,当 WSGIApplication 对象生成后,它调用了 run 方法来把服务跑起来,我们看下这个方法做了什么。

WSGIApplication 这个类也没有实现 run 方法,根据 MRO 能知道它会先调用 Application.run 方法,里面对配置做了一些检查、输出后,向上调用了 BaseApplication.run

BaseApplication.run
1
2
3
4
5
6
7
def run(self):
try:
Arbiter(self).run()
except RuntimeError as e:
print("\nError: %s\n" % e, file=sys.stderr)
sys.stderr.flush()
sys.exit(1)

它将自己作为一个参数,生成了 Arbiter 对象,调用了它的 run 方法,那么这个 Arbiter 又是什么?

进程管理

Arbiter 类是 gunicorn 的核心组件,它负责主进程的所有生命周期管理,server 的启动、worker 进程的管理、监控,系统信号处理,日志处理,资源回收等。主要属性与方法如下:

classDiagram
    class Arbiter {
        - ...
        - app
        - cfg
        - worker_class
        + \__init__
        + start
        + run
        + handle_...
        + reexec
        + reload
        + ..._workers
        + manage_workers
        + spawn_workers
        + kill_workers
        + ...
    }

Arbiter 根据传入的 app 对象完成了初始化,之后执行了它的 run 方法。

Arbiter.run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def run(self):
"Main master loop."
self.start()
util._setproctitle("master [%s]" % self.proc_name)

try:
self.manage_workers()

while True:
# blah blah
sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
if sig is None:
self.sleep()
self.murder_workers()
self.manage_workers()
continue

# process signals
except Exception:
# blah blah
sys.exit(-1)

这个方法中首先调用了 start 方法:

Arbiter.start
1
2
3
4
5
6
7
8
9
def start(self):
# blah blah
self.init_signals()

if not self.LISTENERS:
fds = None
# blah blah
self.LISTENERS = sock.create_sockets(self.cfg, self.log, fds)
# blah blah

start 方法中,首先是注册了主进程要处理的 signals,之后绑定了 IP 和端口进行监听后返回。

回到 run 方法,下一步调用了 manage_workers,里面判断了当前 worker 的数量是否与配置的数量一致,如果不一致则调用 spawn_worker 创建配置指定的 worker 对象。

worker 是如何创建的?

我们看一下 spawn_worker 方法:

Arbiter.spawn_worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def spawn_worker(self):
# blah blah
pid = os.fork()
if pid != 0:
worker.pid = pid
self.WORKERS[pid] = worker
return pid

# blah blah

# Process Child
worker.pid = os.getpid()
try:
# blah blah
worker.init_process()
sys.exit(0)
except Exception:
# blah blah
sys.exit(-1)
finally:
# blah blah

这是一个标准的创建子进程的流程,使用 fork 系统调用 来创建 worker 进程。

pid = os.fork() 之后,在主进程中,获取到的会是新创建的子进程的 pid,而在子进程中,拿到的 pid 会是 0。主进程和子进程在这里的执行路径会分叉,主进程设置保存 worker 进程对象后返回;子进程设置自己的 pid 后,执行了 worker 进程的初始化流程 worker.init_processworker 的业务逻辑后续再讨论,本章节我们继续看主进程的处理流程。

主进程在创建子进程后,回到 run 方法,在创建完 worker 进程后,主进程的准备工作就完成了,之后就是进入主循环,循环中每次都会查看当前是否有需要处理的信号,如果没有就 sleep 一秒钟让出 cpu,之后再次检查,有的话就处理相应的信号。

信号是如何监听和处理的?

Arbiter.start 方法中,我们提到了注册要处理的 signal,我们看下是如何处理的:

Arbiter.init_signals
1
2
3
4
5
6
7
8
9
10
11
def init_signals(self):
# close old PIPE

# initialize the pipe
self.PIPE = pair = os.pipe()
# config pipe

# initialize all signals
for s in self.SIGNALS:
signal.signal(s, self.signal)
signal.signal(signal.SIGCHLD, self.handle_chld)

可以看到先是创建了一个匿名管道保存了下来,之后向操作系统注册(绑定)了具体信号的处理函数,又加上了接收子进程状态变更的信号 SIGCHLD

gunicorn 的主进程监听的信号有:HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH 这几个,部分含义如下:

信号 可移植代号 全称 行为 含义
SIGHUP 1 Hang Up 终止进程 常用于通知进程重新加载配置,如 nginx 等
SIGINT 2 Interrupt 终止进程 用户发出退出信号(Ctrl+C),请求终止程序
SIGQUIT 3 Quit 终止进程 + Core Dump 用户发出退出信号(Ctrl+\),常用于调试
SIGTERM 15 Terminal 终止进程 要求进程优雅退出

当向服务发送一个信号时,比如用户按下 Ctrl+C 来退出服务,就会向服务发送一个 SIGINT 信号,之后操作系统会马上执行注册的 Arbiter.signal 方法:

Arbiter.signal
1
2
3
4
5
6
7
8
9
10
11
def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig)
self.wakeup()

def wakeup(self):
try:
os.write(self.PIPE[1], b'.')
except OSError as e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise

能看到在 signal 中,代码只是将信号添加到了队列里,之后调用 wakeup 往匿名管道里写了一个字节 b'.',这样做的结果是怎样的呢?我们看下之前提到的 sleep 方法:

Arbiter.sleep
1
2
3
4
5
6
7
8
9
10
11
def sleep(self):
try:
ready = select.select([self.PIPE[0]], [], [], 1.0)
if not ready[0]:
return
while os.read(self.PIPE[0], 1):
pass
except OSError as e:
# blah blah
except KeyboardInterrupt:
sys.exit()

在之前查看 Arbiter.run 方法时,我们提到每次循环当没有信号要处理时(SIG_QUEUE 队列是空的),主线程会 sleep 一秒,说的就是这个 select.select,它会监听一系列的文件描述符,当有任何一个准备好的时候就会返回,最后的 1.0 是超时时间,当监听的文件描述符都没有准备好时,等待超时后会返回空的元组。

所以在 sleep 方法中,会尝试读取 self.PIPE[0] 这个 fd,一旦读到内容,则会立即唤醒,否则就会等待一秒直到超时。

self.PIPE[0] 这个 fd,是在 init_signal 中创建的,与 self.PIPE[1] 是成对的,当往 self.PIPE[1] 中写入数据时,可以从 self.PIPE[0] 中读取到。

所以 Arbiter 针对信号的处理就完整了:

  1. 在服务启动时,注册要监听的信号,同时生成一个匿名管道用于唤醒 sleep 的线程
  2. 当有信号发出时,系统回调注册监听时绑定的函数 signal,将信号添加到队列中,同时向匿名管道发送一个字节唤醒线程进行处理
  3. 线程被唤醒,从管道中将通知发送的一个字节读出来,将管道清空,之后回到主循环
  4. 主循环中发现信号处理队列不是空的,开始信号处理流程,直到信号处理队列被消费完,再次开始 sleep

主循环中是这样处理信号的:

1
2
3
4
5
6
7
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
if not handler:
self.log.error("Unhandled signal: %s", signame)
continue
self.log.info("Handling signal: %s", signame)
handler()

根据信号名称拼出对应处理方法的名称 handle_%s,之后通过 getattr 拿到对应的处理方法对象,再对这个对象进行调用。

子进程的管理

上边我们讨论了 gunicorn 的主进程是如何处理信号的,那它是如何管理子进程的呢?

在主循环中,我们看到每次循环时,如果没有信号需要处理,服务会 sleep 一秒,之后就会进行 murder_workers

如果配置了超时时间,并且某个子进程(worker)超时未响应,就会给这个子进程发送一个 SIGABRT 信号通知子进程退出;如果已经通知过该子进程退出后,下次循环仍未退出,则直接发送 SIGKILL 信号,将该进程强制杀掉。之后主循环中会再调用 manage_workers,将缺少的 worker 补齐。

因为 init_signal 中添加了监听子进程状态变更的信号,当一个子进程退出时,会调用 reap_workers 方法,将对应的 worker 从子进程列表中移除。

结尾

至此,一个 gunicorn 服务从程序的入口,到配置的加载,应用的生成,端口的监听,子进程的创建以及信号的响应,到子进程的管理,基本上就完整了。

下一个章节我们将要讨论 gunicorn 的配置是如何处理的。