一.基础准备. 

1. local类

  对于一个类,实例化得到它的对象后,如果开启多个线程对它的属性进行操作,会发现数据时不安全的

import time
from threading import Thread
import threading

class Foo(object):
    pass

foo = Foo()

def add(i):
    foo.num = i
    time.sleep(1)
    print(foo.num,i,threading.current_thread().ident,foo)

for i in range(10):
    task = Thread(target=add,args=(i,))
    task.start()

##结果##
9 9 5616 <__main__.Foo object at 0x0000018992A05400>
9 8 9780 <__main__.Foo object at 0x0000018992A05400>
9 6 4692 <__main__.Foo object at 0x0000018992A05400>
9 3 2168 <__main__.Foo object at 0x0000018992A05400>
9 1 4424 <__main__.Foo object at 0x0000018992A05400>
9 0 10264 <__main__.Foo object at 0x0000018992A05400>
9 7 11728 <__main__.Foo object at 0x0000018992A05400>
9 5 6688 <__main__.Foo object at 0x0000018992A05400>
9 2 808 <__main__.Foo object at 0x0000018992A05400>
9 4 1160 <__main__.Foo object at 0x0000018992A05400>

# 以上结论得知:
# 线程操作公共对象 产生不安全现象

  为了保证对属性操作的安全,而且又不使用锁(使用锁会使异步线程变成同步操作), 可以使用继承local类的方式实现

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。
from threading import local

class Foo(local):
    pass

foo = Foo()

def add(i):
    foo.num = i
    time.sleep(1)
    print(foo.num,i,threading.current_thread().ident,foo)

for i in range(10):
    task = Thread(target=add,args=(i,))
    task.start()

##结果##
8 8 10372 <__main__.Foo object at 0x000002157C037648>
9 9 8604 <__main__.Foo object at 0x000002157C037648>
6 6 9512 <__main__.Foo object at 0x000002157C037648>
5 5 1240 <__main__.Foo object at 0x000002157C037648>
3 3 5404 <__main__.Foo object at 0x000002157C037648>
2 2 13548 <__main__.Foo object at 0x000002157C037648>
0 0 10516 <__main__.Foo object at 0x000002157C037648>
7 7 8644 <__main__.Foo object at 0x000002157C037648>
4 4 8420 <__main__.Foo object at 0x000002157C037648>
1 1 4372 <__main__.Foo object at 0x000002157C037648>

 

  多线程实现的栈(简易), 注意使用了local类. 使用local能保证每一个线程都能对类的属性进行操作,而且互不干扰

from threading import local,Thread
import threading

class MyLocalStack(local):
    stack = {}
    pass

mls = MyLocalStack()

def ts(i):
    a = threading.current_thread().ident
    mls.stack[a] = [f"r{i+1}",f"s{i+1}"]
    print(mls.stack,a)
    time.sleep(1)
    print(mls.stack[a].pop(),mls.stack[a].pop(),a,'删除')

    # time.sleep(0.0089)
    mls.stack.pop(a)
    print(mls.stack , a, '删除')

for i in range(5):
    task = Thread(target=ts,args=(i,))
    task.start()

#堆栈
    # 程序员
# 先进后出 后进先出
# 先进先出 后进后出

  结果

Flask系列10-- 请求上下文分析 随笔 第1张

 2. app()

from flask import Flask
app = Flask(__name__)
app.run()

  这个app就是,flask启动时的那个对象, 一般函数加()的意思是执行这个函数,而这里对象加() 的意思就是执行app对象的__call__方法, 原因文档里面解释的非常清楚.看下图

  Flask系列10-- 请求上下文分析 随笔 第2张

二. run_simple()源码分析

1.  run_simple() 源码详解,  如何调用了app.__call__()

  首先运行时调用了app.run()方法, 相当于调用了app.__call__(),而为什么调用了app.__call__()从哪看出来的呢, 百度的那些粘贴人总是说werkzeug的rum_simple方法,但从没有具体的解释,我就研究了一下

from flask import Flask
app = Flask(__name__)
app.run()

Flask系列10-- 请求上下文分析 随笔 第3张

def run(self, host=None, port=None, debug=None,
    load_dotenv=True, **options): # self = app = Flask()

        from werkzeug.serving import run_simple  # 引入werkzeug相关 run_simple开始运行
    
        try:  # host 127.0.0.1 port=5000 self=app=Flask()
            run_simple(host, port, self, **options)  # self = app = Flask()    

run_simple()中将执行inner()函数

def run_simple(
    hostname,
    port,
    application,  # self = app = Flask() application: the WSGI application to execute
    use_reloader=False,
    use_debugger=False,
    use_evalex=True,
    extra_files=None,
    reloader_interval=1,
    reloader_type="auto",
    threaded=False,
    processes=1,
    request_handler=None,
    static_files=None,
    passthrough_errors=False,
    ssl_context=None,
):

        from ._reloader import run_with_reloader
        run_with_reloader(inner, extra_files, reloader_interval, reloader_type) # 这里开始 使用inner函数
    else:
        inner()  # 这里开始 使用inner函数   

来看inner()函数,这个inner()函数是被包含在run_simple()函数中的

    def inner():
        try:
            fd = int(os.environ["WERKZEUG_SERVER_FD"])
        except (LookupError, ValueError):
            fd = None
        srv = make_server(
            hostname,
            port,
            application,   # self = app = Flask()  第三个位置参数
            threaded,
            processes,
            request_handler,
            passthrough_errors,
            ssl_context,
            fd=fd,
        )  # srv就是返回了一个 # BaseWSGIServer实例BaseWSGIServer(app) srv.app = app
        if fd is None:
            log_startup(srv.socket)
        srv.serve_forever() # srv(self = app = Flask() ) 

inner()中注意make_server()

def make_server(
    host=None,
    port=None,
    app=None,    # self = app = Flask()
    threaded=False,
    processes=1,
    request_handler=None,
    passthrough_errors=False,
    ssl_context=None,
    fd=None,
):
    """Create a new server instance that is either threaded, or forks
    or just processes one request after another.

    创建一个新的server实例
    """
    if threaded and processes > 1:
        raise ValueError("cannot have a multithreaded and multi process server.")
    elif threaded:
        return ThreadedWSGIServer(  # 多线程wsgiserver启动
            host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
        )   # self = app = Flask()
    elif processes > 1:  # 多个进程时
        return ForkingWSGIServer(
            host,
            port,
            app,   # self = app = Flask()
            processes,
            request_handler,
            passthrough_errors,
            ssl_context,
            fd=fd,
        )
    else:
        return BaseWSGIServer(
            host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
        ) # self = app = Flask()

来看BaseWSGIServer

class BaseWSGIServer(HTTPServer, object):

    """Simple single-threaded, single-process WSGI server."""

    multithread = False
    multiprocess = False
    request_queue_size = LISTEN_QUEUE

    def __init__(
        self,
        host,
        port,
        app,  # self = app = Flask()
        handler=None,
        passthrough_errors=False,
        ssl_context=None,
        fd=None,
    ):
        if handler is None:
            handler = WSGIRequestHandler

        self.address_family = select_address_family(host, port)

        if fd is not None:
            real_sock = socket.fromfd(fd, self.address_family, socket.SOCK_STREAM)
            port = 0

        server_address = get_sockaddr(host, int(port), self.address_family)

        # remove socket file if it already exists
        if self.address_family == af_unix and os.path.exists(server_address):
            os.unlink(server_address)
        HTTPServer.__init__(self, server_address, handler) #handler = WSGIRequestHandler

        self.app = app  # self = app = Flask()
        self.passthrough_errors = passthrough_errors
        self.shutdown_signal = False
        self.host = host
        self.port = self.socket.getsockname()[1]

来看WSGIrequesthandler ,部分代码省略

    def run_wsgi(self):
        if self.headers.get("Expect", "").lower().strip() == "100-continue":
            self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n")

        self.environ = environ = self.make_environ()
        headers_set = []
        headers_sent = []

        def execute(app):
            application_iter = app(environ, start_response) # app()= self.wsgi_app(environ, start_response)
            try:
                for data in application_iter:
                    write(data)
                if not headers_sent:
                    write(b"")
            finally:
                if hasattr(application_iter, "close"):
                    application_iter.close()
                application_iter = None

        try:
            execute(self.server.app)  #
        except (_ConnectionError, socket.timeout) as e:
            self.connection_dropped(e, environ)
        except Exception:
            if self.server.passthrough_errors:
                raise
            from .debug.tbtools import get_current_traceback

            traceback = get_current_traceback(ignore_system_exceptions=True)
            try:
                # if we haven't yet sent the headers but they are set
                # we roll back to be able to set them again.
                if not headers_sent:
                    del headers_set[:]
                execute(InternalServerError())
            except Exception:
                pass
            self.server.log("error", "Error on request:\n%s", traceback.plaintext) 

 至此可以看出用到了app()进而就可以知道app.run()时实际就是执行了app.__call__()

 三.请求上下文源码分析

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄