读源码-Gunicorn篇-4-worker

本节说明

上一节我们梳理了 gunicorn 是如何处理配置的,这一节我们来看 gunicorn 的工作进程是如何运行的。

分类

gunicorn 默认实现了 synceventletgeventtornadogthread 这几种类型 worker 的实现,含义如下:

名称 说明 异步 实现
sync 默认类型,每个worker同一时间只处理一个请求 workers.sync.SyncWorker
eventlet 基于 eventlet 实现,需要 monkey patch workers.geventlet.EventletWorker
gevent 基于 gevent 实现,需要 monkey patch workers.ggevent.GeventWorker
tornado 集成 Tornado 框架的事件循环机制,适合与 Tornado 应用结合使用 workers.gtornado.TornadoWorker
gthread 每个 worker 是一个进程,内部开启多个线程同时处理请求 workers.gthread.ThreadWorker

我们从最简单的默认的 sync 类型来看 gunicorn 是如何加载一个 worker 的。

加载

Arbiter.spawn_worker 方法中,有这样一行:

Arbiter.spawn_worker
1
2
3
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)

这里使用 Arbiter.worker_class 直接实例化了一个 worker 对象,说明 worker_class 其实是一个 Class 类,它是从哪里来的?

Arbiter.setup 中,有这样一行:

Arbiter.setup
1
self.worker_class = self.cfg.worker_class

可以看出它使用的是 Config 里配置的 worker_class。但是指定使用 worker 的配置项 --worker-class 是一个字符串啊,为什么这里可以直接当作一个类来实例化对象呢?

我们看 Config 类,它里面实现了一个 worker_class 方法:

Config.worker_class
1
2
3
4
5
6
7
8
9
10
11
12
13
@property
def worker_class(self):
uri = self.settings['worker_class'].get()

# are we using a threaded worker?
is_sync = isinstance(uri, str) and (uri.endswith('SyncWorker') or uri == 'sync')
if is_sync and self.threads > 1:
uri = "gunicorn.workers.gthread.ThreadWorker"

worker_class = util.load_class(uri)
if hasattr(worker_class, "setup"):
worker_class.setup()
return worker_class

这个方法通过 @property 进行了装饰,这样在调用 Arbiter.cfg.worker_class 时,其实是调用了这个方法,而不是像其他属性那样最后会到 Config.__getattr__ 中查找。而 worker_class 方法中通过指定的 worker 类型,调用 util.load_class 将对应的类对象加载了,所以 Arbiter.worker_class 就是一个类对象了,可以直接拿进行实例化对象。

创建

Arbiter 在实例化 worker 对象后,执行了 fork 系统调用,我们回顾一下这部分代码:

Arbiter.spawn_worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def spawn_worker(self):
# blah blah
pid = os.fork()
if pid != 0:
# blah blah

# blah blah

# Process Child
worker.pid = os.getpid()
try:
# blah blah
worker.init_process()
sys.exit(0)
except Exception:
# blah blah
sys.exit(-1)
finally:
# blah blah

worker 进程在创建后,执行了 worker.init_process 对子进程进行了初始化。

这里我们先看一下 SyncWorker 的结构:

classDiagram
    direction LR
    class Worker {
        - ...
        - pid
        - ppid
        - sockets
        - app
        - cfg
        - timeout
        + notify
        + init_process
        + init_signals
        + load_wsgi
        + handle_...
        + ...
    }

    class SyncWorker {
        - ...
        + accept
        + wait
        + run
        + run_for_one
        + run_for_multiple
        + handle
        + handle_request
    }

    Worker <|-- SyncWorker

这里又是一个典型的 模板方法 设计模式。

可以看到 init_process 其实是在 Worker 类中实现的。其实 Workergunicorn 中所有 worker 的基类,最终都会交给 Worker.init_process 来初始化进程。我们看一下这个方法的实现:

Worker.init_process
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def init_process(self):
# set environment' variables
# blah blah

# Reseed the random number generator
# blah blah

# For waking ourselves up
self.PIPE = os.pipe()
for p in self.PIPE:
util.set_non_blocking(p)
util.close_on_exec(p)

# Prevent fd inheritance
# blah blah

self.wait_fds = self.sockets + [self.PIPE[0]]

self.init_signals()

# start the reloader
# blah blah

self.load_wsgi()

# Enter main run loop
self.booted = True
self.run()

worker 进程中也是采用与主进程一样的办法,通过创建一个 PIPE 来唤醒进程,调用 init_signals 方法注册了信号处理方法,之后加载了 wsgi 对象,其实就是我们创建的 myapp:run

1
2
Worker.load_wsgi() -> BaseApplication.wsgi() -> WSGIApplication.load() ->
WSGIApplication.load_wsgiapp() -> util.import_app(app_uri)

之后调用了 Worker.run 方法。Worker 中并没有对 run 方法进行实现,而是交给了子类来处理。SyncWorker.run 中设置了一下默认的超时时长,将监听的 socket 配置为非阻塞,之后就根据监听的 socket 数量进入到处理单个还是多个 socket 的方法中,这里我们从简单的入手,看一下 run_for_one 方法:

SyncWorker.run_for_one
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def run_for_one(self, timeout):
listener = self.sockets[0]
while self.alive:
self.notify()

try:
self.accept(listener)
continue
except OSError as e:
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
raise

if not self.is_parent_alive():
return

try:
self.wait(timeout)
except StopWaiting:
return

这里就是子进程的主循环了!

与主进程的交互

在主循环中,我们看到在每次循环的开始和最后,分别调用了 notifywait 两个方法,作用如下:

  • notify : 更新时间,用于主进程判断子进程是否还活着
  • wait : 当没有请求需要处理时,等待一段时间,让出 cpu 时间

我们先看一下 wait 方法:

SyncWorker.wait
1
2
3
4
5
6
7
8
9
10
11
def wait(self, timeout):
try:
self.notify()
ret = select.select(self.wait_fds, [], [], timeout)
if ret[0]:
if self.PIPE[0] in ret[0]:
os.read(self.PIPE[0], 1)
return ret[0]

except OSError as e:
# blah blah

wait 方法很简单,与 Arbiter.sleep 方法类似,也是使用 select.select 来等待事件发生或超时,之后马上进入主循环进行处理。

notify 方法更简单,只有一行:

Worker.notify
1
2
def notify(self):
self.tmp.notify()

那么这一行代码干了什么呢?

我们看一下 Worker.tmp,它是 WorkerTmp 的一个对象,它创建了一个不会进行读写的临时文件的文件描述符 fd,当执行 notify 方法时,会更新这个 fd 的时间。

在主进程的 murder_workers 方法中,会依据这个时间,来判断子进程是否超时,代码如下:

Arbiter.murder_workers
1
2
3
4
5
6
7
8
9
10
11
12
13
for (pid, worker) in workers:
try:
if time.monotonic() - worker.tmp.last_update() <= self.timeout:
continue
except (OSError, ValueError):
continue

if not worker.aborted:
self.log.critical("WORKER TIMEOUT (pid:%s)", pid)
worker.aborted = True
self.kill_worker(pid, signal.SIGABRT)
else:
self.kill_worker(pid, signal.SIGKILL)

能看到当 worker 长时间未更新时间(超时)后,主进程会通知子进程退出或杀掉子进程。

但是这个 fd 是子进程里的,主进程为什么可以访问呢?

在这里:

Worker.__init__
1
2
3
def __init__(self, age, ppid, sockets, app, timeout, cfg, log):
# blah blah
self.tmp = WorkerTmp(cfg)

因为这个 fd 是在主进程中创建的,所以 os.fork 后子进程会继承这个 fd,之后两个进程就都可以访问这个 fd 了,实现了主进程与子进程的交互。

请求处理

回到 SyncWorker.run 方法,在主循环中,因为 listener 被设置为非阻塞,所以当没有请求进来时,会马上抛出异常,而这些异常会被忽略掉,接着会进入 wait 方法等待 0.5 秒,如果在 wait 时突然有请求进来了,wait 方法中的 select.select 马上就会返回,将线程唤醒,进入下一次循环去处理到来的请求。

SyncWorker.accept 方法中,我们看到在收到一个连接后,会交由 handle 方法进行处理,由于 handle 方法是同步的,新进来的连接请求会排队等待,等 handle 结束后再进行处理。

SyncWorker.handle
1
2
3
parser = http.RequestParser(self.cfg, client, addr)
req = next(parser)
self.handle_request(listener, req, client, addr)

可以看到在 handle 方法中创建了一个 http parser 来解析用户请求,之后就交给了 handle_request 来处理:

SyncWorker.handle_request
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def handle_request(self, listener, req, client, addr):
environ = {}
resp = None
try:
self.cfg.pre_request(self, req)
request_start = datetime.now()
resp, environ = wsgi.create(req, client, addr,
listener.getsockname(), self.cfg)
resp.force_close()
# blah blah
respiter = self.wsgi(environ, resp.start_response)
try:
if isinstance(respiter, environ['wsgi.file_wrapper']):
resp.write_file(respiter)
else:
for item in respiter:
resp.write(item)
resp.close()
finally:
request_time = datetime.now() - request_start
self.log.access(resp, req, environ, request_time)
if hasattr(respiter, "close"):
respiter.close()
except Exception:
# blah blah
finally:
try:
self.cfg.post_request(self, req, environ, resp)
except Exception:
self.log.exception("Exception in post_request hook")

这里就是最终处理 http 请求的地方,可以看到在请求处理前先调用了配置的 pre_request 钩子方法,之后创建了响应对象和 WSGI 环境对象,设置响应对象完成后强制关闭链接,然后调用了 WSGI 应用,将用户请求交给了我们定义的 main:app,接收 WSGI 应用返回的迭代器,将内容写入响应对象,处理完成后,调用了配置的 post_request 钩子,完成用户请求的处理。

请求已经处理完成了,之后在 handle 方法中关闭了用户连接。

处理完成,回到主循环,等待下一个用户的请求。

至此。

在这个章节中,我们学习了一个 worker 是如何加载、创建的,如何与主进程交互,以及是如何处理用户请求的,在下一个章节,我们将要看一下一个 http 请求是如何被解析、处理的。

马上来。