[译] Diy Async Web Framework (hzlmn, 2020)

AsyncIO

TOC

Logo

简介

近几年来,异步编程在 Python 社区中变得越来越受欢迎。诸如aiohttp之类的异步库,在使用量上呈现出惊人的增长态势,因为它们能够并发处理大量链接,并在此基础上保持代码的可读性与简洁程度。而就在不久前,Django 也承诺将在下个大版本中增加对异步的支持。种种迹象都表明,Python 的异步编程拥有非常不错的前景。然而,对于很大一部分习惯于使用标准阻塞模型的开发人员来说,这些异步工具的工作机制显得十分令人困惑。因此,我将在这份简短的指南中从零构建一个简化版的aiohttp,并通过这种方式深入幕后,理清 Python 异步编程的工作过程。我们将从官方文档中的一个基本示例出发,并逐步增加我们所感兴趣的必要功能。让我们立刻开始吧!

在这篇指南中,我将假设你已经对asyncio有了最基本的了解。如果你需要回顾一些相关知识的话,这几篇文章或许能帮到你:

当然,如果你已经等不及了,可以直接在这里找最终的源码:hzlmn/sketch

相关项目

Asyncio 库的低层级 API:Transports 与 Protocols

Asyncio库经过了漫长的演变才成为现在这个样子。曾经的 asyncio 是作为一个名为“tulip”的底层工具而被创造出来的。那个时候,开发高层级的应用程序可不像今天这么愉快。

在如今大多数情况下,asyncio都被作为一种高层级的 API 来使用,不过该库也提供了一些低层级的助手来供那些库的设计者管理事件循环,以及实现网络或进程间的通信协议。

Asyncio库仅为TCP, UDP, SSL 以及子进程提供了开箱即用的支持。而其他异步库则基于asyncio库所提供基础传输与编程接口实现了它们所需的更高层级的协议,如HTTPFTP等等。

所有通信都是通过链接 Transports 和 Protocols 来完成的。简单地说,Transports 描述了我们该如何传送数据,而 Protocols 负责决定传送哪些数据。

关于 Transports 与 Protocols,asyncio库提供了一份非常棒的官方文档,你可以在这里访问它,并进行更深入的了解。

作为项目的第一步,让我们先来编写一个简单的TCP回显服务器。

server.py

 1import asyncio
 2
 3class Server(asyncio.Protocol):
 4    def connection_made(self, transport):
 5        self._transport = transport
 6
 7    def data_received(self, data):
 8        message = data.decode()
 9
10        self._transport.write(data)
11
12        self._transport.close()
13
14loop = asyncio.get_event_loop()
15
16coro = loop.create_server(Server, '127.0.0.1', 8080)
17server = loop.run_until_complete(coro)
18
19try:
20    loop.run_forever()
21except KeyboardInterrupt:
22    pass
23
24server.close()
25loop.run_until_complete(server.wait_closed())
26loop.close()
1$ curl http://127.0.0.1:8080
2GET / HTTP/1.1
3Host: 127.0.0.1:8080
4User-Agent: curl/7.54.0
5Accept: */*

从以上示例可以看出,构建异步服务器程序的代码非常简单。不过如果你想构建一个更高层级的应用程序,仅凭这些还不太够。

由于 HTTP 协议工作在 TCP 协议之上,我们现在已经可以向我们的服务器程序发送 HTTP 请求了。然而,接收并使用未经格式化处理的 HTTP报文显然是非常困难的。所以我们下一步的工作就是去增加一种更好的 HTTP 处理机制。

在服务器程序上实现协议

让我们为服务器程序增加一个解析 HTTP请求的功能,这样我们就可以提取并使用请求头、请求正文以及请求路径等信息。如何解析 HTTP请求是一个非常复杂的话题,这远远超出了本指南所研究的范围,因此我们将直接使用httptools来解析请求。httptools是一个效率高,兼容性好,并且相当灵活的HTTP解析器。

此外,aiohttp项目也实现了一个基于 Python 的HTTP解析器,并且这个解析器已经被集成到了 Node 的 http-parser中。

接下来,我们需要实现一个用来与服务器类组合的解析器类。

http_parser.py

 1class HttpParserMixin:
 2    def on_body(self, data):
 3        self._body = data
 4
 5    def on_url(self, url):
 6        self._url = url
 7
 8    def on_message_complete(self):
 9        print(f"Received request to {self._url.decode(self._encoding)}")
10
11    def on_header(self, header, value):
12        header = header.decode(self._encoding)
13        self._headers[header] = value.decode(self._encoding)

实现解析器类 HttpParserMixin后,将它与我们的 Server 类组合到一起。

server.py

 1import asyncio
 2
 3from httptools import HttpRequestParser
 4
 5from .http_parser import HttpParserMixin
 6
 7class Server(asyncio.Protocol, HttpParserMixin):
 8    def __init__(self, loop):
 9        self._loop = loop
10        self._encoding = "utf-8"
11        self._url = None
12        self._headers = {}
13        self._body = None
14        self._transport = None
15        self._request_parser = HttpRequestParser(self)
16
17    def connection_made(self, transport):
18        self._transport = transport
19
20    def connection_lost(self, *args):
21        self._transport = None
22
23    def data_received(self, data):
24        # Pass data to our parser
25        self._request_parser.feed_data(data)

现在,我们终于拥有了一个能够解析传入的 HTTP 请求,并从中提取重要信息的服务器。让我们把它运行起来。

server.py

 1if __name__ == "__main__":
 2    loop = asyncio.get_event_loop()
 3    serv = Server(loop)
 4    server = loop.run_until_complete(loop.create_server(lambda: serv, port=8080))
 5
 6    try:
 7        print("Started server on ::8080")
 8        loop.run_until_complete(server.serve_forever())
 9    except KeyboardInterrupt:
10        server.close()
11        loop.run_until_complete(server.wait_closed())
12        loop.stop()
1> python server.py
2Started server on ::8080
1> curl http://127.0.0.1:8080/hello

Request/Response对象

目前,我们已经拥有了一个可以解析 HTTP请求的服务器程序。但为了构建应用程序,我们还需要在某些方面做进一步的抽象。

现在让我们来创建一个用于将所有 HTTP 请求信息组合到一起的 Request 类。请确保已经安装了 yarl 库,我们将使用它来处理 url。

request.py

 1import json
 2
 3from yarl import URL
 4
 5class Request:
 6    _encoding = "utf_8"
 7
 8    def __init__(self, method, url, headers, version=None, body=None, app=None):
 9        self._version = version
10        self._method = method.decode(self._encoding)
11        self._url = URL(url.decode(self._encoding))
12        self._headers = headers
13        self._body = body
14
15    @property
16    def method(self):
17        return self._method
18
19    @property
20    def url(self):
21        return self._url
22
23    @property
24    def headers(self):
25        return self._headers
26
27    def text(self):
28        if self._body is not None:
29            return self._body.decode(self._encoding)
30
31    def json(self):
32        text = self.text()
33        if text is not None:
34            return json.loads(text)
35
36    def __repr__(self):
37        return f"<Request at 0x{id(self)}>"

下一步,我们还需要这样一个结构:它能帮助我们以程序员友好的方式描述 HTTP 响应,并将其转化为原始的 HTTP 报文。这种转化后的报文可以通过 asyncio.Transport处理。

response.py

 1import http.server
 2
 3web_responses = http.server.BaseHTTPRequestHandler.responses
 4
 5class Response:
 6    _encoding = "utf-8"
 7
 8    def __init__(
 9        self,
10        body=None,
11        status=200,
12        content_type="text/plain",
13        headers=None,
14        version="1.1",
15    ):
16        self._version = version
17        self._status = status
18        self._body = body
19        self._content_type = content_type
20        if headers is None:
21            headers = {}
22        self._headers = headers
23
24    @property
25    def body(self):
26        return self._body
27
28    @property
29    def status(self):
30        return self._status
31
32    @property
33    def content_type(self):
34        return self._content_type
35
36    @property
37    def headers(self):
38        return self._headers
39    
40    def add_body(self, data):
41        self._body = data
42
43    def add_header(self, key, value):
44        self._headers[key] = value
45    
46    def __str__(self):
47        """We will use this in our handlers, it is actually generation of raw HTTP response,
48        that will be passed to our TCP transport
49        """
50        status_msg, _ = web_responses.get(self._status)
51        
52        messages = [
53            f"HTTP/{self._version} {self._status} {status_msg}",
54            f"Content-Type: {self._content_type}",
55            f"Content-Length: {len(self._body)}",
56        ]
57
58        if self.headers:
59            for header, value in self.headers.items():
60                messages.append(f"{header}: {value}")
61
62        if self._body is not None:
63            messages.append("\r\n" + self._body)
64
65        return "\r\n".join(messages)
66
67    def __repr__(self):
68        return f"<Response at 0x{id(self)}>"

如上所示,代码非常简单。我们封装了所有的数据,并为其属性定义了对应的 getter 方法。我们还定义了一些之后要用到的助手方法,用于处理 text 以及 json格式的报文体。接下来的任务就是更新一下服务器程序,使之能够通过接收到的消息来创建 Request 对象。

Request 对象应当在解析完整个请求后创建,因此我们把创建工作添加到解析器类的 on_message_complete事件的处理方法中。

http_parser.py

 1class HttpParserMixin:
 2    ...
 3
 4    def on_message_complete(self):
 5        self._request = self._request_class(
 6            version=self._request_parser.get_http_version(),
 7            method=self._request_parser.get_method(),
 8            url=self._url,
 9            headers=self._headers,
10            body=self._body,
11        )
12
13    ...

Server类也需要改造一下,使之能够创建 Response 对象,并将编码后的消息传递给asyncio.Transport

server.py

 1from .response import Response
 2...
 3
 4class Server(asyncio.Protocol, HttpParserMixin):
 5    ...
 6
 7    def __init__(self, loop):
 8        ...
 9        self._request = None
10        self._request_class = Request
11
12    ...
13
14    def data_received(self, data):
15        self._request_parser.feed_data(data)
16
17        resp = Response(body=f"Received request on {self._request.url}")
18        self._transport.write(str(resp).encode(self._encoding))
19
20        self._transport.close()

现在再去运行 server.py,我们就可以使用 curl 去请求http://localhost:8080/path,并在响应中看到 Received request on /path了。

Application 与 UrlDispatcher

现阶段,我们已经拥有了能够解析HTTP请求的服务器,以及能够处理请求周期的 Request/Response 对象。然而,我们这个手写的工具包中还缺少一些重要的概念。首先,我们现在只有一个主请求处理器,而在大型的应用程序中,我们需要很多请求处理器来处理不同的路由。因此我们还需要一种机制来为不同路由分别注册处理程序。

现在让我们用内置的字典来实现一个尽可能简单的 UrlDispatcher。该字典的键是一个由请求方法与请求路径组成的二元组,而值是一个处理程序。此外我们还需要一个单独的处理程序去处理那些无法识别路由的请求。

router.py

 1from .response import Response
 2
 3class UrlDispatcher:
 4    def __init__(self):
 5        self._routes = {}
 6
 7    async def _not_found(self, request):
 8         return Response(f"Not found {request.url} on this server", status=404)
 9
10    def add_route(self, method, path, handler):
11        self._routes[(method, path)] = handler
12
13    def resolve(self, request):
14        key = (request.method, request.url.path)
15        if key not in self._routes:
16            return self._not_found
17        return self._routes[key]

当然,我们还缺少很多别的东西,比如参数化的路由等等。我们会在之后增加它们,现在还是让程序尽可能保持简单吧。

直接与底层的 Server 进行交互是非常麻烦的,所以,接下来我们需要一个Applicatio 容器,用来组合所有与应用相关的信息。

 1import asyncio
 2
 3from .router import UrlDispatcher
 4from .server import Server
 5from .response import Response
 6
 7class Application:
 8    def __init__(self, loop=None):
 9        if loop is None:
10            loop = asyncio.get_event_loop()
11
12        self._loop = loop
13        self._router = UrlDispatcher()
14
15    @property
16    def loop(self):
17        return self._loop
18
19    @property
20    def router(self):
21        return self._router
22
23    def _make_server(self):
24        return Server(loop=self._loop, handler=self._handler, app=self)
25
26    async def _handler(self, request, response_writer):
27        """Process incoming request"""
28        handler = self._router.resolve(request)
29        resp = await handler(request)
30
31        if not isinstance(resp, Response):
32            raise RuntimeError(f"expect Response instance but got {type(resp)}")
33
34        response_writer(resp)

我们需要对 Server 稍加修改,并增加一个 response_writer 方法来将数据传送给 transport。同时,我们需要在 Server 的构造函数中增加 handler 属性和 app属性。这些属性将被用来调用相应的处理程序。

server.py

 1class Server(asyncio.Protocol, HttpParserMixin):
 2    ...
 3
 4    def __init__(self, loop, handler, app):
 5        self._loop = loop
 6        self._url = None
 7        self._headers = {}
 8        self._body = None
 9        self._transport = None
10        self._request_parser = HttpRequestParser(self)
11        self._request = None
12        self._request_class = Request
13        self._request_handler = handler
14        self._request_handler_task = None
15
16    def response_writer(self, response):
17        self._transport.write(str(response).encode(self._encoding))
18        self._transport.close()
19    
20    ...

http_parser.py

 1class HttpParserMixin:
 2    def on_body(self, data):
 3        self._body = data
 4
 5    def on_url(self, url):
 6        self._url = url
 7
 8    def on_message_complete(self):
 9        self._request = self._request_class(
10            version=self._request_parser.get_http_version(),
11            method=self._request_parser.get_method(),
12            url=self._url,
13            headers=self._headers,
14            body=self._body,
15        )
16
17        self._request_handler_task = self._loop.create_task(
18            self._request_handler(self._request, self.response_writer)
19        )
20
21    def on_header(self, header, value):
22        header = header.decode(self._encoding)
23        self._headers[header] = value.decode(self._encoding)

终于,我们完成了基本功能的开发,并且可以注册新的路由和处理程序了。接下来,我们要写一个简单的助手方法来运行我们的应用实例(就像 aiohttp中的 web.run_app)。

application.py

 1def run_app(app, host="127.0.0.1", port=8080, loop=None):
 2    if loop is None:
 3        loop = asyncio.get_event_loop()
 4
 5    serv = app._make_server()
 6    server = loop.run_until_complete(
 7        loop.create_server(lambda: serv, host=host, port=port)
 8    )
 9
10    try:
11        print(f"Started server on {host}:{port}")
12        loop.run_until_complete(server.serve_forever())
13    except KeyboardInterrupt:
14        server.close()
15        loop.run_until_complete(server.wait_closed())
16        loop.stop()

现在,是时候用我们新开发的工具包来创建简单的应用程序了。

app.py

 1import asyncio
 2
 3from .response import Response
 4from .application import Application, run_app
 5
 6app = Application()
 7
 8async def handler(request):
 9    return Response(f"Hello at {request.url}")
10
11app.router.add_route("GET", "/", handler)
12
13if __name__ == "__main__":
14    run_app(app)

如果你已经运行了程序,并向 /发送了一个 GET 请求,就可以看到 Hello at /响应。同时,如果你访问其他路由,则会收到一个 404响应。

1$ curl 127.0.0.1:8080/
2Hello at /
3
4$ curl 127.0.0.1:8080/invalid
5Not found /invalid on this server

不错,我们终于完成了!但不得不说,这个项目还有很多需要改进的地方。

更进一步

到目前为止,我们已经开发并运行了所有的基本功能,但我们的“框架”中的某些东西还有待改进。首先,正如之前提到过的,我们的路由程序缺少参数化路由的功能,这是所有现代的框架都必须具有的特性。然后我们需要添加对中间件的支持,这也是十分常见,并且非常强大的概念。此外,在aiohttp的炫酷特性中,应用的生命周期钩子深得我喜爱(如on_startup, on_shutdown, on_cleanup),所以我们也应当尝试着去实现它。

路由参数

目前我们的 UrlDispatcher非常精简,它把被注册的 url 路径当作字符串来处理。我们首先要做的是在resolve方法中添加对/user/{username}等模式的支持。同时,我们还需要一个_format_pattern 助手方法,该方法可以从参数化字符串生成实际的正则表达式。也许你已经注意到了,我们还定义了_method_not_allowed 助手方法,以及另外几个用来处理 GET, POST等简单路由的方法。

router.py

 1import re
 2
 3from functools import partialmethod
 4
 5from .response import Response
 6
 7class UrlDispatcher:
 8    _param_regex = r"{(?P<param>\w+)}"
 9
10    def __init__(self):
11        self._routes = {}
12
13    async def _not_found(self, request):
14        return Response(f"Could not find {request.url.raw_path}")
15
16    async def _method_not_allowed(self, request):
17        return Response(f"{request.method} not allowed for {request.url.raw_path}")
18
19    def resolve(self, request):
20        for (method, pattern), handler in self._routes.items():
21            match = re.match(pattern, request.url.raw_path)
22
23            if match is None:
24                return None, self._not_found
25
26            if method != request.method:
27                return None, self._method_not_allowed
28
29            return match.groupdict(), handler
30
31    def _format_pattern(self, path):
32        if not re.search(self._param_regex, path):
33            return path
34
35        regex = r""
36        last_pos = 0
37
38        for match in re.finditer(self._param_regex, path):
39            regex += path[last_pos: match.start()]
40            param = match.group("param")
41            regex += r"(?P<%s>\w+)" % param
42            last_pos = match.end()
43
44        return regex
45
46    def add_route(self, method, path, handler):
47        pattern = self._format_pattern(path)
48        self._routes[(method, pattern)] = handler
49
50    add_get = partialmethod(add_route, "GET")
51
52    add_post = partialmethod(add_route, "POST")
53
54    add_put = partialmethod(add_route, "PUT")
55
56    add_head = partialmethod(add_route, "HEAD")
57
58    add_options = partialmethod(add_route, "OPTIONS")

我们还需要改造一下Applicatio 容器,使UrlDispatcher resolve 方法能够返回match_info 以及对应的handler 。修改 Application._handler 中的以下几行。

application.py

1class Application:
2    ...
3    async def _handler(self, request, response_writer):
4        """Process incoming request"""
5        match_info, handler = self._router.resolve(request)
6
7        request.match_info = match_info
8            
9        ...

中间件

可能有些读者会对中间件这个概念感到陌生。简单来说,中间件是一个协程,且该协程会在请求到达服务器之前启动,并修改传入处理程序的 Request对象,或修改处理程序生成的 Response对象。我们的需求实现起来非常简单。首先,我们要在Application 对象中添加一个用于注册中间件的列表,并修改 Application._handler 来运行这些中间件。注意,每个中间件的运行都要基于前一个中间件的工作结果,而不是基于最初的处理程序的工作结果。

application.py

 1from functools import partial
 2...
 3
 4class Application:
 5    def __init__(self, loop=None, middlewares=None):
 6        ...
 7        if middlewares is None:
 8            self._middlewares = []
 9
10    ...
11
12    async def _handler(self, request, response_writer):
13        """Process incoming request"""
14        match_info, handler = self._router.resolve(request)
15        
16        request.match_info = match_info
17
18        if self._middlewares:
19            for md in self._middlewares:
20                handler = partial(md, handler=handler)
21
22        resp = await handler(request)
23
24        ...

然后,为我们的应用程序添加一个请求日志中间件。

app.py

 1import asyncio
 2
 3from .response import Response
 4from .application import Application, run_app
 5
 6async def log_middleware(request, handler):
 7    print(f"Received request to {request.url.raw_path}")
 8    return await handler(request)
 9
10app = Application(middlewares=[log_middleware])
11
12async def handler(request):
13    return Response(f"Hello at {request.url}")
14
15app.router.add_route("GET", "/", handler)
16
17if __name__ == "__main__":
18    run_app(app)

现在再运行这个程序,我们就可以看到每个请求所对应的 Received request to / 消息了。

App 的生命周期钩子

下一步我们需要添加一些功能,使得应用程序可以在服务启动、服务停止等事件发生时执行对应的协程。这也是 aiohttp所拥有的一项非常灵巧的特性。可以处理的信号非常多,例如 on_startupon_shutdownon_response_prepared 等等。但是我们想让程序尽可能保持简洁,因此只要实现startupshutdown即可。

我们要先在 Application 内部为每个事件设置一个列表,用来添加各自的处理程序,并将其封装为属性,提供对应的 getter。然后我们要编写实际的 startupshutdown 协程,并在 run_app增加相应的调用。

application.py

 1class Application:
 2    def __init__(self, loop=None, middlewares=None):
 3        ...
 4        self._on_startup = []
 5        self._on_shutdown = []
 6
 7    ... 
 8
 9    @property
10    def on_startup(self):
11        return self._on_startup
12
13    @property
14    def on_shutdown(self):
15        return self._on_shutdown
16
17    async def startup(self):
18        coros = [func(self) for func in self._on_startup]
19        await asyncio.gather(*coros, loop=self._loop)
20
21    async def shutdown(self):
22        coros = [func(self) for func in self._on_shutdown]
23        await asyncio.gather(*coros, loop=self._loop)
24
25    ...
26
27def run_app(app, host="127.0.0.1", port=8080, loop=None):
28    if loop is None:
29        loop = asyncio.get_event_loop()
30
31    serv = app._make_server()
32
33    loop.run_until_complete(app.startup())
34
35    server = loop.run_until_complete(
36        loop.create_server(lambda: serv, host=host, port=port)
37    )
38
39    try:
40        print(f"Started server on {host}:{port}")
41        loop.run_until_complete(server.serve_forever())
42    except KeyboardInterrupt:
43        loop.run_until_complete(app.shutdown())
44        server.close()
45        loop.run_until_complete(server.wait_closed())
46        loop.stop()

完善异常处理

至此,我们已经开发好了大部分核心特性,但是我们还缺少异常处理机制。 Aiohttp允许开发人员以处理原生 Python 异常的方式去处理 web 异常,这也是其强大的特性之一。它实现上结合了 Exception 类以及 Response 类,非常的灵活,因此我们也来实现类似的机制。

首先,我们要创建 HTTPException 基类,并基于该类来实现一些我们可能会需要的助手类:HTTPNotFound 用于路径无法识别的情况、HTTPBadRequest 用于用户侧的问题、HTTPFound 用于重定向。

 1from .response import Response
 2
 3class HTTPException(Response, Exception):
 4    status_code = None
 5
 6    def __init__(self, reason=None, content_type=None):
 7        self._reason = reason
 8        self._content_type = content_type
 9
10        Response.__init__(
11            self,
12            body=self._reason,
13            status=self.status_code,
14            content_type=self._content_type or "text/plain",
15        )
16
17        Exception.__init__(self, self._reason)
18
19
20class HTTPNotFound(HTTPException):
21    status_code = 404
22
23
24class HTTPBadRequest(HTTPException):
25    status_code = 400
26
27
28class HTTPFound(HTTPException):
29    status_code = 302
30
31    def __init__(self, location, reason=None, content_type=None):
32        super().__init__(reason=reason, content_type=content_type)
33        self.add_header("Location", location)

然后,我们需要修改一下 Application._handler 来实际捕获 web 异常。

application.py

 1class Application:
 2    ...
 3    async def _handler(self, request, response_writer):
 4        """Process incoming request"""
 5        try:
 6            match_info, handler = self._router.resolve(request)
 7
 8            request.match_info = match_info
 9
10            if self._middlewares:
11                for md in self._middlewares:
12                    handler = partial(md, handler=handler)
13
14            resp = await handler(request)
15        except HTTPException as exc:
16            resp = exc
17
18        ...

现在我们可以删除 UrlDispatcher 中的_not_found_method_not_allowed 助手方法了。取而代之的是抛出对应的异常。

router.py

 1class UrlDispatcher:
 2    ...
 3    def resolve(self, request):
 4        for (method, pattern), handler in self._routes.items():
 5            match = re.match(pattern, request.url.raw_path)
 6
 7            if match is None:
 8                raise HTTPNotFound(reason=f"Could not find {request.url.raw_path}")
 9
10            if method != request.method:
11                raise HTTPBadRequest(reason=f"{request.method} not allowed for {request.url.raw_path}")
12
13            return match.groupdict(), handler
14
15        ...

在出现某些反常的情况时,我们并不想去的破坏应用程序的运行,因此我们最好为服务器内部错误添加一个标准格式的响应。让我们编写一个简单的 html 模板,以及用于格式化异常的助手方法。

helpers.py

 1import traceback
 2
 3from .response import Response
 4
 5server_exception_templ = """
 6<div>
 7    <h1>500 Internal server error</h1>
 8    <span>Server got itself in trouble : <b>{exc}</b><span>
 9    <p>{traceback}</p>
10</div>
11"""
12
13
14def format_exception(exc):
15    resp = Response(status=500, content_type="text/html")
16    trace = traceback.format_exc().replace("\n", "</br>")
17    msg = server_exception_templ.format(exc=str(exc), traceback=trace)
18    resp.add_body(msg)
19    return resp

这非常简单,我们现在捕获了 Application._handler 中生成的所有 Exception ,并使用我们的助手方法生成实际的 html 响应。

application.py

 1class Application:
 2    ...
 3    async def _handler(self, request, response_writer):
 4        """Process incoming request"""
 5        try:
 6            match_info, handler = self._router.resolve(request)
 7
 8            request.match_info = match_info
 9
10            if self._middlewares:
11                for md in self._middlewares:
12                    handler = partial(md, handler=handler)
13
14            resp = await handler(request)
15        except HTTPException as exc:
16            resp = exc
17        except Exception as exc:
18            resp = format_exception(exc)
19        ...

优雅地退出

最后,我们要为用于正确关闭应用程序的过程设置信号处理机制。让我们把 run_app修改成下面这个样子:

application.py

 1...
 2
 3def run_app(app, host="127.0.0.1", port=8080, loop=None):
 4    if loop is None:
 5        loop = asyncio.get_event_loop()
 6
 7    serv = app._make_server()
 8
 9    loop.run_until_complete(app.startup())
10
11    server = loop.run_until_complete(
12        loop.create_server(lambda: serv, host=host, port=port)
13    )
14
15    loop.add_signal_handler(
16        signal.SIGTERM, lambda: asyncio.ensure_future(app.shutdown())
17    )
18
19    ...

应用程序示例

我们的工具包已经准备就绪了。现在,让我们为之前的应用示例添加生命周期钩子和异常处理。

app.py

 1from .application import Application, run_app
 2
 3async def on_startup(app):
 4    # you may query here actual db, but for an example let's just use simple set.
 5    app.db = {"john_doe",}
 6
 7async def log_middleware(request, handler):
 8    print(f"Received request to {request.url.raw_path}")
 9    return await handler(request)
10
11async def handler(request):
12    username = request.match_info["username"]
13    if username not in request.app.db:
14        raise HTTPNotFound(reason=f"No such user with as {username} :(")
15      
16    return Response(f"Welcome, {username}!")
17
18app = Application(middlewares=[log_middleware])
19
20app.on_startup.append(on_startup)
21
22app.router.add_get("/{username}", handler)
23
24if __name__ == "__main__":
25    run_app(app)

如果我们正确完成了所有操作,现在就可以看到每个请求的日志消息了。同时,应用程序会响应欢迎信息给已注册用户的请求,响应 HTTPNotFound 给那些未注册用户或无法识别路由的请求。

总结

aiohttpsanic的启发,我们用了 500 行代码手写了一个非常简单,而又功能强大的微型框架。诚然,它还不能用于生产环境,因为它还缺少很多实用且重要的特性,如更健壮的服务器,对 http 规范的完整支持,以及 web 套接字等等。但是,我相信在这个过程中,我们更好地理解了这些工具是如何被构建的。正如著名物理学家理查德·费曼所说:“如果我不能创造某个事物,那就说明我对它的理解还不够”。希望你能够喜欢这个指南,再见:wave:。