Flask系列10-- 请求上下文分析
一.基础准备.
1. local类
对于一个类,实例化得到它的对象后,如果开启多个线程对它的属性进行操作,会发现数据时不安全的
import time from threading import Thread import threading class Foo(object): pass foo = Foo() def add(i): foo.num = i time.sleep(1) print(foo.num,i,threading.current_thread().ident,foo) for i in range(10): task = Thread(target=add,args=(i,)) task.start() ##结果## 9 9 5616 <__main__.Foo object at 0x0000018992A05400> 9 8 9780 <__main__.Foo object at 0x0000018992A05400> 9 6 4692 <__main__.Foo object at 0x0000018992A05400> 9 3 2168 <__main__.Foo object at 0x0000018992A05400> 9 1 4424 <__main__.Foo object at 0x0000018992A05400> 9 0 10264 <__main__.Foo object at 0x0000018992A05400> 9 7 11728 <__main__.Foo object at 0x0000018992A05400> 9 5 6688 <__main__.Foo object at 0x0000018992A05400> 9 2 808 <__main__.Foo object at 0x0000018992A05400> 9 4 1160 <__main__.Foo object at 0x0000018992A05400>
# 以上结论得知:
# 线程操作公共对象 产生不安全现象
为了保证对属性操作的安全,而且又不使用锁(使用锁会使异步线程变成同步操作), 可以使用继承local类的方式实现
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。from threading import local class Foo(local): pass foo = Foo() def add(i): foo.num = i time.sleep(1) print(foo.num,i,threading.current_thread().ident,foo) for i in range(10): task = Thread(target=add,args=(i,)) task.start() ##结果## 8 8 10372 <__main__.Foo object at 0x000002157C037648> 9 9 8604 <__main__.Foo object at 0x000002157C037648> 6 6 9512 <__main__.Foo object at 0x000002157C037648> 5 5 1240 <__main__.Foo object at 0x000002157C037648> 3 3 5404 <__main__.Foo object at 0x000002157C037648> 2 2 13548 <__main__.Foo object at 0x000002157C037648> 0 0 10516 <__main__.Foo object at 0x000002157C037648> 7 7 8644 <__main__.Foo object at 0x000002157C037648> 4 4 8420 <__main__.Foo object at 0x000002157C037648> 1 1 4372 <__main__.Foo object at 0x000002157C037648>
多线程实现的栈(简易), 注意使用了local类. 使用local能保证每一个线程都能对类的属性进行操作,而且互不干扰
from threading import local,Thread import threading class MyLocalStack(local): stack = {} pass mls = MyLocalStack() def ts(i): a = threading.current_thread().ident mls.stack[a] = [f"r{i+1}",f"s{i+1}"] print(mls.stack,a) time.sleep(1) print(mls.stack[a].pop(),mls.stack[a].pop(),a,'删除') # time.sleep(0.0089) mls.stack.pop(a) print(mls.stack , a, '删除') for i in range(5): task = Thread(target=ts,args=(i,)) task.start() #堆栈 # 程序员 # 先进后出 后进先出 # 先进先出 后进后出
结果
2. app()
from flask import Flask app = Flask(__name__) app.run()
这个app就是,flask启动时的那个对象, 一般函数加()的意思是执行这个函数,而这里对象加() 的意思就是执行app对象的__call__方法, 原因文档里面解释的非常清楚.看下图
二. run_simple()源码分析
1. run_simple() 源码详解, 如何调用了app.__call__()
首先运行时调用了app.run()方法, 相当于调用了app.__call__(),而为什么调用了app.__call__()从哪看出来的呢, 百度的那些粘贴人总是说werkzeug的rum_simple方法,但从没有具体的解释,我就研究了一下
from flask import Flask app = Flask(__name__) app.run()
def run(self, host=None, port=None, debug=None,
load_dotenv=True, **options): # self = app = Flask()
from werkzeug.serving import run_simple # 引入werkzeug相关 run_simple开始运行
try: # host 127.0.0.1 port=5000 self=app=Flask()
run_simple(host, port, self, **options) # self = app = Flask()
run_simple()中将执行inner()函数
def run_simple( hostname, port, application, # self = app = Flask() application: the WSGI application to execute use_reloader=False, use_debugger=False, use_evalex=True, extra_files=None, reloader_interval=1, reloader_type="auto", threaded=False, processes=1, request_handler=None, static_files=None, passthrough_errors=False, ssl_context=None, ): from ._reloader import run_with_reloader run_with_reloader(inner, extra_files, reloader_interval, reloader_type) # 这里开始 使用inner函数 else: inner() # 这里开始 使用inner函数
来看inner()函数,这个inner()函数是被包含在run_simple()函数中的
def inner(): try: fd = int(os.environ["WERKZEUG_SERVER_FD"]) except (LookupError, ValueError): fd = None srv = make_server( hostname, port, application, # self = app = Flask() 第三个位置参数 threaded, processes, request_handler, passthrough_errors, ssl_context, fd=fd, ) # srv就是返回了一个 # BaseWSGIServer实例BaseWSGIServer(app) srv.app = app if fd is None: log_startup(srv.socket) srv.serve_forever() # srv(self = app = Flask() )
inner()中注意make_server()
def make_server( host=None, port=None, app=None, # self = app = Flask() threaded=False, processes=1, request_handler=None, passthrough_errors=False, ssl_context=None, fd=None, ): """Create a new server instance that is either threaded, or forks or just processes one request after another. 创建一个新的server实例 """ if threaded and processes > 1: raise ValueError("cannot have a multithreaded and multi process server.") elif threaded: return ThreadedWSGIServer( # 多线程wsgiserver启动 host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd ) # self = app = Flask() elif processes > 1: # 多个进程时 return ForkingWSGIServer( host, port, app, # self = app = Flask() processes, request_handler, passthrough_errors, ssl_context, fd=fd, ) else: return BaseWSGIServer( host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd ) # self = app = Flask()
来看BaseWSGIServer
class BaseWSGIServer(HTTPServer, object): """Simple single-threaded, single-process WSGI server.""" multithread = False multiprocess = False request_queue_size = LISTEN_QUEUE def __init__( self, host, port, app, # self = app = Flask() handler=None, passthrough_errors=False, ssl_context=None, fd=None, ): if handler is None: handler = WSGIRequestHandler self.address_family = select_address_family(host, port) if fd is not None: real_sock = socket.fromfd(fd, self.address_family, socket.SOCK_STREAM) port = 0 server_address = get_sockaddr(host, int(port), self.address_family) # remove socket file if it already exists if self.address_family == af_unix and os.path.exists(server_address): os.unlink(server_address) HTTPServer.__init__(self, server_address, handler) #handler = WSGIRequestHandler self.app = app # self = app = Flask() self.passthrough_errors = passthrough_errors self.shutdown_signal = False self.host = host self.port = self.socket.getsockname()[1]
来看WSGIrequesthandler ,部分代码省略
def run_wsgi(self): if self.headers.get("Expect", "").lower().strip() == "100-continue": self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n") self.environ = environ = self.make_environ() headers_set = [] headers_sent = [] def execute(app): application_iter = app(environ, start_response) # app()= self.wsgi_app(environ, start_response) try: for data in application_iter: write(data) if not headers_sent: write(b"") finally: if hasattr(application_iter, "close"): application_iter.close() application_iter = None try: execute(self.server.app) # except (_ConnectionError, socket.timeout) as e: self.connection_dropped(e, environ) except Exception: if self.server.passthrough_errors: raise from .debug.tbtools import get_current_traceback traceback = get_current_traceback(ignore_system_exceptions=True) try: # if we haven't yet sent the headers but they are set # we roll back to be able to set them again. if not headers_sent: del headers_set[:] execute(InternalServerError()) except Exception: pass self.server.log("error", "Error on request:\n%s", traceback.plaintext)
至此可以看出用到了app()进而就可以知道app.run()时实际就是执行了app.__call__()
三.请求上下文源码分析

更多精彩