读源码-Gunicorn篇-5-协议

本节说明

本节的标题是——协议,有两层意思,一个是用户请求的协议 HTTP,另一个是 gunicorn 解析请求后,与我们的应用交互的 WSGI 协议,本章节将分两个部分分别讨论。

开始前,我们先回顾一下 HTTP 协议。

HTTP 协议

HTTP 请求

一个完整的 HTTP 请求包含四个部分:

  • 请求行:请求方法 + URL + HTTP 版本
  • 请求头:包含各种元数据信息
  • 空行:分隔头部和消息体
  • 请求体:包含要发送的数据(可选)

示例:

1
2
3
4
5
6
GET /hello?a=123 HTTP/1.1
Host: localhost:8000
User-Agent: curl/8.7.1
Accept: */*
Proxy-Connection: Keep-Alive

HTTP 响应

一个完整的 HTTP 响应结构也包含四个部分:

  • 状态行:HTTP 版本 + 状态码 + 状态描述
  • 响应头:服务器返回的元数据信息
  • 空行:分隔响应头和响应体
  • 响应体:实际返回的数据内容(可选)

示例:

1
2
3
4
5
6
7
8
9
10
HTTP/1.1 200 OK
Content-Length: 13
Connection: keep-alive
Content-Type: text/plain
Date: Mon, 26 May 2025 07:19:15 GMT
Keep-Alive: timeout=4
Proxy-Connection: keep-alive
Server: gunicorn

Hello World!

HTTP 请求和响应中,除了请求体和响应体部分,其它的可以看作是纯文本的内容。

参照上边的例子,我们来看一下一个用户请求是如何解析交给 WSGI 应用的。

解析请求

我们回到上一节请求接收的部分:

SyncWorker.handle
1
2
3
parser = http.RequestParser(self.cfg, client, addr)
req = next(parser)
self.handle_request(listener, req, client, addr)

在收到请求后,先是创建了一个 parser 对象:

RequestParser.__init__
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def __init__(self, cfg, source, source_addr):
self.cfg = cfg
self.unreader = SocketUnreader(source)
self.mesg = None
self.source_addr = source_addr
# request counter (for keepalive connetions)
self.req_count = 0

def __next__(self):
# Stop if HTTP dictates a stop.
# Discard any unread body of the previous message
# Parse the next request
self.req_count += 1
self.mesg = Request(self.cfg, self.unreader, self.source_addr, self.req_count)
return self.mesg

将连接的 client socket 交给 SocketUnreader 生成了一个 unreader,之后调用了 __next__ 方法生成了 Request 对象,那么这个 Request 对象在创建时都做了什么呢?

Request.__init__
1
2
3
def __init__(self, cfg, unreader, peer_addr, req_number=1):
# blah blah
super().__init__(cfg, unreader, peer_addr)

它初始化了一些属性后,将创建工作交给了父类 Message

Message.__init__
1
2
3
4
5
6
def __init__(self, cfg, unreader, peer_addr):
# blah blah

unused = self.parse(self.unreader)
self.unreader.unread(unused)
self.set_body_reader()

父类也是初始化了一些属性后,又调回了 Requestparse 方法来解析:

Request.parse
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def parse(self, unreader):
buf = io.BytesIO()
self.get_data(unreader, buf, stop=True)

# get request line
line, rbuf = self.read_line(unreader, buf, self.limit_request_line)

# handle proxy protocol
# blah blah

self.parse_request_line(line)
buf = io.BytesIO()
buf.write(rbuf)

# Headers
# 处理 Header 头是空的,或数据未完整接收的情况

self.headers = self.parse_headers(data[:idx], from_trailer=False)

ret = data[idx + 4:]
buf = None
return ret

parse 方法中,先是通过 unreader(SocketUnreader) 将请求数据读取到内存 buf 中,然后取出了第一行的数据 line 和后续的数据部分 rbuf

在前边 HTTP 请求的示例中,我们可以看到,第一行的内容是:GET /hello?a=123 HTTP/1.1gunicorn 是这样处理的:

Request.parse_request_line
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def parse_request_line(self, line_bytes):
bits = [bytes_to_str(bit) for bit in line_bytes.split(b" ", 2)]
if len(bits) != 3:
raise InvalidRequestLine(bytes_to_str(line_bytes))

# Method: RFC9110 Section 9
self.method = bits[0]

# blah blah

# URI
self.uri = bits[1]

# blah blah

try:
parts = split_request_uri(self.uri)
except ValueError:
raise InvalidRequestLine(bytes_to_str(line_bytes))
self.path = parts.path or ""
self.query = parts.query or ""
self.fragment = parts.fragment or ""

# Version
match = VERSION_RE.fullmatch(bits[2])
if match is None:
raise InvalidHTTPVersion(bits[2])
self.version = (int(match.group(1)), int(match.group(2)))
# blah blah

将第一行数据按空格拆分开,之后第一部分作为请求方法存了起来,然后是 uri 部分,使用 urllib 解析后保存了 pathqueryfragment,然后使用正则取出了协议的版本,在我们这个例子里是 1.1

之后回到 parse 方法中,开始处理 Headers。先是处理了空请求头或请求头没有完整接收完的情况,之后交给了 parse_headersHeaders 部分进行解析:

Message.parse_headers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def parse_headers(self, data, from_trailer=False):
cfg = self.cfg
headers = []

# Split lines on \r\n
lines = [bytes_to_str(line) for line in data.split(b"\r\n")]

# handle scheme headers
# blah blah

# Parse headers into key/value pairs paying attention
# to continuation lines.
while lines:
if len(headers) >= self.limit_request_fields:
raise LimitRequestHeaders("limit request headers fields")

# Parse initial header name: value pair.
curr = lines.pop(0)
header_length = len(curr) + len("\r\n")
if curr.find(":") <= 0:
raise InvalidHeader(curr)
name, value = curr.split(":", 1)
if self.cfg.strip_header_spaces:
name = name.rstrip(" \t")
if not TOKEN_RE.fullmatch(name):
raise InvalidHeaderName(name)

# this is still a dangerous place to do this
# but it is more correct than doing it before the pattern match:
# after we entered Unicode wonderland, 8bits could case-shift into ASCII:
# b"\xDF".decode("latin-1").upper().encode("ascii") == b"SS"
name = name.upper()

value = [value.strip(" \t")]

# Consume value continuation lines..
while lines and lines[0].startswith((" ", "\t")):
# .. which is obsolete here, and no longer done by default
if not self.cfg.permit_obsolete_folding:
raise ObsoleteFolding(name)
curr = lines.pop(0)
header_length += len(curr) + len("\r\n")
if header_length > self.limit_request_field_size > 0:
raise LimitRequestHeaders("limit request headers "
"fields size")
value.append(curr.strip("\t "))
value = " ".join(value)

# 处理一些安全性的问题

headers.append((name, value))

return headers

parse_headers 中,可以看到按 \r\n 将请求头部分拆分成行,之后对每一行进行遍历,按 : 截成请求头名称和值部分,处理空白的部分,将请求头名称转成了大写。

之后又出现了一个 while 循环,这里是为了兼容一个 header 有多行值的问题。

再进行一些安全性问题的处理后,按 (name, value) 列表的形式返回了所有的 Header 头,请求头部分的解析就完成了。

回到 parse 方法,将返回的 headers 保存后,截掉四个字节的 \r\n\r\n,把后续的 body 作为剩余部分返回给了 Request 的初始化方法。

Message.__init__ 方法按请求头的 CONTENT-LENGTHTRANSFER-ENCODING 的内容,将剩余部分封装为一个 Body 对象,之后就返回了。

至此,一个 Request 对象就创建完成了。

处理请求

回到 SyncWorker 处理请求的地方,可以看到创建完成 Request 对象后,就到了请求具体处理的地方了:

SyncWorker.handle_request
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def handle_request(self, listener, req, client, addr):
environ = {}
resp = None
try:
# do pre_request

request_start = datetime.now()
resp, environ = wsgi.create(req, client, addr,
listener.getsockname(), self.cfg)
# blah blah

respiter = self.wsgi(environ, resp.start_response)
try:
if isinstance(respiter, environ['wsgi.file_wrapper']):
resp.write_file(respiter)
else:
for item in respiter:
resp.write(item)
resp.close()
finally:
request_time = datetime.now() - request_start
self.log.access(resp, req, environ, request_time)
if hasattr(respiter, "close"):
respiter.close()
except Exception:
# blah blah
finally:
# do post_request

在请求真正被处理前,gunicorn 通过 wsgi.create 已解析的 Request 对象创建了 environ 词典,以及包装的 Response 对象,之后将对用户请求的处理控制交给了 wsgi,也就是我们定义的 main:app 应用,接收处理的结果,再将处理的结果返回给客户端。

到这里,一个完整的请求就处理完成了。

但是,gunicorn 传递给我们的应用都有哪些内容呢?或者说,一个 WSGI 服务器是如何调用一个 WSGI 应用,WSGI 应用又怎么知道服务器交给自己的都是些什么呢?

这就是 WSGI 协议规定的内容,我们看一下。

WSGI 协议

WSGI 协议由 PEP-3333 定义,简单来说:

一个 WSGI 应用,需要是一个可以接收两个参数 (environ, start_response) 的可调用对象:

result = application(environ, start_response)

其中:

  • environ 必须是一个 Python 内置的 dict 对象,包含 CGI 风格的环境变量
  • start_response 必须是一个包含 3 个参数的可调用对象 (status, response_headers, exc_info),其中前两个参数是必须的,第三个参数为可选
    • status: 状态码字符串
    • response_headers: 一个元素格式为 (name, value) 元组的列表
    • exc_info: 在异常处理时调用 start_response 才需要指定,如果指定了则必须是一个 Pythonsys.exc_info() 元组
  • start_response 必须返回一个 write(body_data) 格式的可调用对象,接收一个字节字符串的参数 body_data,作为响应 body 的一部分
  • application 返回的结果 result 必须是一个可返回 0 或多个字节字符串的无缓冲的可迭代对象

协议针对 environ 对象也做了要求:

必须包含的 CGI 变量,部分变量可以为空,但必须存在:

名称 说明 是否可以为空 是否可缺省
REQUEST_METHOD 请求方法,如 GETPOST
SCRIPT_NAME 脚本名称
PATH_INFO 路径信息,比如 /hello
QUERY_STRING 查询字符串,
CONTENT_TYPE 请求内容类型
CONTENT_LENGTH 请求内容长度
SERVER_NAME, SERVER_PORT 服务器名称和端口,需要成对出现
SERVER_PROTOCOL 服务器协议
HTTP_* HTTP 请求头

必须包含的 wsgi 变量:

名称 说明
wsgi.version WSGI 版本,必须是 (1,0)
wsgi.url_scheme URL 协议方案,http 或是 https
wsgi.input 输入流,用于读取请求体
wsgi.errors 错误流,用于写入错误信息
wsgi.multithread 多线程标志,TrueFalse,表示是否在多线程环境中运行
wsgi.multiprocess 多进程标志,TrueFalse,表示是否在多进程环境中运行
wsgi.run_once 是否期望应用只运行一次,TrueFalse

基本上就是这些了。

尾声

到这里,我们分五个部分,针对 gunicorn 项目,分别搭建了调试环境,研究了服务的启动、进程的管理、配置的加载、worker 的运行以及协议的解析等,整个 gunicorn 相关的处理流程也基本上覆盖完整了,我们读源码系列 gunicorn 相关的部分就要先告一段落了。

后续可能还会有一些剩余相关的内容,比如:

  • 异步 worker 的处理
  • 异常是如何处理的
  • 响应是如何处理的
  • 请求/响应中如果包含文件是如何处理的

等等,后续有机会了再补充吧。

下面还有两个系列在准备中了,一个是 Java 相关的 Dubbo 项目,还有一个还是 Python 相关的异步框架 uvicorn+FastAPI 结合着一起看。

uvicorn+FastAPI 的准备得快差不多了,但可能还是会先出 Dubbo 的。

嗯,就这样。