读源码-Gunicorn篇-4-worker
本节说明
上一节我们梳理了 gunicorn
是如何处理配置的,这一节我们来看 gunicorn
的工作进程是如何运行的。
分类
gunicorn
默认实现了 sync
、eventlet
、gevent
、tornado
、gthread
这几种类型 worker
的实现,含义如下:
名称 | 说明 | 异步 | 实现 |
---|---|---|---|
sync | 默认类型,每个worker同一时间只处理一个请求 | 否 | workers.sync.SyncWorker |
eventlet | 基于 eventlet 实现,需要 monkey patch | 是 | workers.geventlet.EventletWorker |
gevent | 基于 gevent 实现,需要 monkey patch | 是 | workers.ggevent.GeventWorker |
tornado | 集成 Tornado 框架的事件循环机制,适合与 Tornado 应用结合使用 | 是 | workers.gtornado.TornadoWorker |
gthread | 每个 worker 是一个进程,内部开启多个线程同时处理请求 | 否 | workers.gthread.ThreadWorker |
我们从最简单的默认的 sync
类型来看 gunicorn
是如何加载一个 worker
的。
加载
在 Arbiter.spawn_worker
方法中,有这样一行:
1 | worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS, |
这里使用 Arbiter.worker_class
直接实例化了一个 worker 对象,说明 worker_class
其实是一个 Class
类,它是从哪里来的?
在 Arbiter.setup
中,有这样一行:
1 | self.worker_class = self.cfg.worker_class |
可以看出它使用的是 Config
里配置的 worker_class
。但是指定使用 worker
的配置项 --worker-class
是一个字符串啊,为什么这里可以直接当作一个类来实例化对象呢?
我们看 Config
类,它里面实现了一个 worker_class
方法:
1 |
|
这个方法通过 @property
进行了装饰,这样在调用 Arbiter.cfg.worker_class
时,其实是调用了这个方法,而不是像其他属性那样最后会到 Config.__getattr__
中查找。而 worker_class
方法中通过指定的 worker
类型,调用 util.load_class
将对应的类对象加载了,所以 Arbiter.worker_class
就是一个类对象了,可以直接拿进行实例化对象。
创建
Arbiter
在实例化 worker
对象后,执行了 fork
系统调用,我们回顾一下这部分代码:
1 | def spawn_worker(self): |
worker
进程在创建后,执行了 worker.init_process
对子进程进行了初始化。
这里我们先看一下 SyncWorker
的结构:
classDiagram
direction LR
class Worker {
- ...
- pid
- ppid
- sockets
- app
- cfg
- timeout
+ notify
+ init_process
+ init_signals
+ load_wsgi
+ handle_...
+ ...
}
class SyncWorker {
- ...
+ accept
+ wait
+ run
+ run_for_one
+ run_for_multiple
+ handle
+ handle_request
}
Worker <|-- SyncWorker
这里又是一个典型的 模板方法
设计模式。
可以看到 init_process
其实是在 Worker
类中实现的。其实 Worker
是 gunicorn
中所有 worker
的基类,最终都会交给 Worker.init_process
来初始化进程。我们看一下这个方法的实现:
1 | def init_process(self): |
在 worker
进程中也是采用与主进程一样的办法,通过创建一个 PIPE
来唤醒进程,调用 init_signals
方法注册了信号处理方法,之后加载了 wsgi
对象,其实就是我们创建的 myapp:run
:
1 | Worker.load_wsgi() -> BaseApplication.wsgi() -> WSGIApplication.load() -> |
之后调用了 Worker.run
方法。Worker
中并没有对 run
方法进行实现,而是交给了子类来处理。SyncWorker.run
中设置了一下默认的超时时长,将监听的 socket
配置为非阻塞,之后就根据监听的 socket
数量进入到处理单个还是多个 socket
的方法中,这里我们从简单的入手,看一下 run_for_one
方法:
1 | def run_for_one(self, timeout): |
这里就是子进程的主循环了!
与主进程的交互
在主循环中,我们看到在每次循环的开始和最后,分别调用了 notify
和 wait
两个方法,作用如下:
notify
: 更新时间,用于主进程判断子进程是否还活着wait
: 当没有请求需要处理时,等待一段时间,让出cpu
时间
我们先看一下 wait
方法:
1 | def wait(self, timeout): |
wait
方法很简单,与 Arbiter.sleep
方法类似,也是使用 select.select
来等待事件发生或超时,之后马上进入主循环进行处理。
notify
方法更简单,只有一行:
1 | def notify(self): |
那么这一行代码干了什么呢?
我们看一下 Worker.tmp
,它是 WorkerTmp
的一个对象,它创建了一个不会进行读写的临时文件的文件描述符 fd
,当执行 notify
方法时,会更新这个 fd
的时间。
在主进程的 murder_workers
方法中,会依据这个时间,来判断子进程是否超时,代码如下:
1 | for (pid, worker) in workers: |
能看到当 worker
长时间未更新时间(超时)后,主进程会通知子进程退出或杀掉子进程。
但是这个 fd
是子进程里的,主进程为什么可以访问呢?
在这里:
1 | def __init__(self, age, ppid, sockets, app, timeout, cfg, log): |
因为这个 fd
是在主进程中创建的,所以 os.fork
后子进程会继承这个 fd
,之后两个进程就都可以访问这个 fd
了,实现了主进程与子进程的交互。
请求处理
回到 SyncWorker.run
方法,在主循环中,因为 listener
被设置为非阻塞,所以当没有请求进来时,会马上抛出异常,而这些异常会被忽略掉,接着会进入 wait
方法等待 0.5 秒,如果在 wait
时突然有请求进来了,wait
方法中的 select.select
马上就会返回,将线程唤醒,进入下一次循环去处理到来的请求。
在 SyncWorker.accept
方法中,我们看到在收到一个连接后,会交由 handle
方法进行处理,由于 handle
方法是同步的,新进来的连接请求会排队等待,等 handle
结束后再进行处理。
1 | parser = http.RequestParser(self.cfg, client, addr) |
可以看到在 handle
方法中创建了一个 http parser
来解析用户请求,之后就交给了 handle_request
来处理:
1 | def handle_request(self, listener, req, client, addr): |
这里就是最终处理 http
请求的地方,可以看到在请求处理前先调用了配置的 pre_request
钩子方法,之后创建了响应对象和 WSGI
环境对象,设置响应对象完成后强制关闭链接,然后调用了 WSGI
应用,将用户请求交给了我们定义的 main:app
,接收 WSGI
应用返回的迭代器,将内容写入响应对象,处理完成后,调用了配置的 post_request
钩子,完成用户请求的处理。
请求已经处理完成了,之后在 handle
方法中关闭了用户连接。
处理完成,回到主循环,等待下一个用户的请求。
至此。
在这个章节中,我们学习了一个 worker
是如何加载、创建的,如何与主进程交互,以及是如何处理用户请求的,在下一个章节,我们将要看一下一个 http
请求是如何被解析、处理的。
马上来。