Python线程,Python之线程(二)

 2023-09-24 阅读 21 评论 0

摘要:一 ThreadLocal 我们知道多线程环境下,每一个线程均可以使用所属进程的全局变量。如果一个线程对全局变量进行了修改,将会影响到其他所有的线程。为了避免多个线程同时对变量进行修改,引入了线程同步机制,通过互斥锁,条件变量或者读写锁来

一 ThreadLocal

我们知道多线程环境下,每一个线程均可以使用所属进程的全局变量。如果一个线程对全局变量进行了修改,将会影响到其他所有的线程。为了避免多个线程同时对变量进行修改,引入了线程同步机制,通过互斥锁,条件变量或者读写锁来控制对全局变量的访问。

只用全局变量并不能满足多线程环境的需求,很多时候线程还需要拥有自己的私有数据,这些数据对于其他线程来说不可见。因此线程中也可以使用局部变量,局部变量只有线程自身可以访问,同一个进程下的其他线程不可访问。

 python 还提供了 ThreadLocal 变量,它本身是一个全局变量,但是每个线程却可以利用它来保存属于自己的私有数据,这些私有数据对其他线程也是不可见的。

ThreadLocal 真正做到了线程之间的数据隔离,并且使用时不需要手动获取自己的线程 ID,如下示例:

from threading import local,current_thread,Thread
import time
# 创建一个Goods对象
class Goods(object):
    def __init__(self,name,weight):
        self.name = name
       
self.weight = weight

   
def show(self):
        print("线程信息:%s" %current_thread().name)
        print("GoodsName=> %s\r\nGoods Weight=>%s" %(self.name,self.weight))



# 创建全局ThreadLocal对象:
global_goods = local()

def process(goods):
    global_goods.goods = goods
   
show_goods()

def show_goods():
    goods = global_goods.goods
    print(goods.show())
    time.sleep(1)


goods1 = Goods("大米",50)
goods2 = Goods("面粉",25)
goods3 = Goods("啤酒",40)

t1 = Thread(target=process, args=(goods1,),name="线程-----A")
t2 = Thread(target=process, args=(goods2,),name="线程-----B")
t3 = Thread(target=process, args=(goods3,),name="线程-----C")
t = []
t.append(t1)
t.append(t2)
t.append(t3)

for i in t:
    i.start()
for i in t:
    i.join()

 

二 协程

理解协程之前,我们先看一下函数,我们知道函数都是顺序执行的,而且函数调用函数是通过栈实现的,比如函数1调用函数2,函数2调用函数3, 然后函数3返回,函数2返回,函数1返回

 

协程,又称为微线程。可以在执行的过程中,然后中断,接着干别的事情,然后回来接着执行。类似于CPU中断。比如CPU执行某一个线程,然后执行一半,切换到别的线程,然后再切回来,根据上下文可以知道上次执行到哪儿了。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,因此:协程能够保留上一次调用的状态

 

协程可以实现多线程的效果:

我们知道单核CPU可以跑多个线程,不是同时跑多个而是切换速度太快,已达到同时执行多个的目的。

那么协程也就是这个原理,单线程在不同的代码块或者函数中切换,每一次的状态可以存储到寄存器,类似于每一次多线程切换,上下文信息存储到CPU的寄存器。不同的是多线程切换是CPU控制的,而协程代码块或者函数切换是用户控制的

 

 

协程优点:

# 单线程,无需线程切换,只是代码块或者函数切换,时间比CPU线程切换花的更少

# 由于串行执行,且同一时间只有一个操作执行,那么就无需原子操作,也就不需要锁,减少了同步的开销

# 方便控制切换

协程缺点:

# 无法利用多核资源

# 阻塞操作,会阻塞整个程序

 

Gevent:

Python线程,Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。

gevent是第三方库,通过greenlet实现协程,其基本思想是:

当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

 

不改变源代码而对功能进行追加和变更,统称为“猴子补丁”。所以猴子补丁并不是Python中专有的。猴子补丁这种东西充分利用了动态语言的灵活性,可以对现有的语言API进行追加,替换,修改Bug,甚至性能优化等等。

二十核四十线程cpu。使用猴子补丁的方式,gevent能够修改标准库里面大部分的阻塞式系统调用,包括socket、ssl、threading和 select等模块,而变为协作式运行。也就是通过猴子补丁的monkey.patch_xxx()来将python标准库中模块或函数改成gevent中的响应的具有协程的协作式对象。这样在不改变原有代码的情况下,将应用的阻塞式方法,变成协程式的。

 

import gevent

def func1():
    print('\033[31;1m[func1]Hi,Andy\033[0m')
    gevent.sleep(2)
    print('\033[31;1m[func1]Hi, Lady\033[0m')

def func2():
    print('\033[32;1m[func2]Hello,Judy\033[0m')
    gevent.sleep(1)
    print('\033[32;1m[func2]Hello,Tidy\033[0m')

'''
gevent.spawn
发起两个操作
当遇到gevent.sleep()则会切换到另外一个操作执行,如果另外一个操作也遇到sleep,那么又切回到原始操作
'''
gevent.joinall([
    gevent.spawn(func1),
    gevent.spawn(func2),
])

 

 

遇到IO阻塞时会自动切换任务

from gevent import monkey;

monkey.patch_all()
import gevent
from urllib.request import urlopen

def f(url):
    print('GET: %s' % url)
    resp = urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))


gevent.joinall([
    gevent.spawn(f, 'https://www.python.org/'),
    gevent.spawn(f, 'https://www.yahoo.com/'),
    gevent.spawn(f, 'https://github.com/'),
])

 

 

通过gevent实现单线程下的多socket并发

 

import sys
import socket
import time
import gevent

from gevent import socket, monkey

monkey.patch_all()
def server(port):
    s =socket.socket()
    s.bind('localhost',port)
    s.listen(500)
    while True:
        sock, addr = s.accept()
        gevent.spawn(handle, sock)


def handle(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as  ex:
        print(ex)
    finally:
        conn.close()


if __name__ == '__main__':
    server(8000)

import socket

HOST = 'localhost'
PORT = 8000
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"), encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    # print(data)

   
print
('Received', repr(data))
s.close()

 

三 事件驱动与异步IO模型

我们写服务器处理模型的程序时,一般有以下几种模型:

第一:创建一个新的进程处理请求

优点:简单

缺点:开销较大,导致服务器性能比较差

第二:创建一个新的线程处理请求

优点:在性能开销方面稍微好点

缺点:但是会涉及到线程同步问题

第三:请求放入一个事件列表,主进程通过非阻塞I/O方式来处理

优点:

缺点:写应用程序逻辑复杂

大部分UI编程模型也都是基于事件驱动的

3.1 传统编程模型和事件驱动模型的区别

创建一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有以下几个缺点:

# 有可能点击频率很少,但是扫描线程还是会一直循环检测,这会造成很多的CPU资源浪费

# 如果是堵塞的,又会出现下面这样的问题,如果我们不但要扫描鼠标点击,还要扫描键盘是否按下,由于扫描鼠标时被堵塞了,那么可能永远不会去扫描键盘;

# 如果一个循环需要扫描的设备非常多,这又会引来响应时间的问题

 

事件驱动模型:

# 有一个事件(消息)队列;

# 鼠标按下时,往这个队列中增加一个点击事件(消息);

# 有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等;

# 事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数;

 

3.2 事件驱动模型的使用场景

当我们面对如下的环境时,事件驱动模型通常是一个好的选择:

第一: 程序中有许多任务、

第二: 任务之间高度独立(因此它们不需要互相通信,或者等待彼此)

第三: 在等待事件到来时,某些任务会阻塞。

 

四 select、poll、epoll

Python select()直接调用操作系统的IO接口,他监控sockets,opened files 和pipes何时变成可读,可写或者通信错误,select()使得同时监控多个连接变得简单,并且这比写一个长循环来等待和监控多客户端要高效

 

import select
import socket
import sys
import queue

# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)

# Bind the socket to the port
server_address = ('localhost', 10000)
print(sys.stderr, 'starting upon %s port %s' % server_address)
server.bind(server_address)

# Listen for incoming connections
server.listen(5)

# Sockets from which we expect to read
inputs = [server]

# Sockets to which we expect to write
outputs = []

message_queues = {}
while inputs:

    # Wait for atleast one of the sockets to be ready for processing
   
print
('\nwaiting for the next event')
    readable, writable, exceptional= select.select(inputs, outputs, inputs)
    # Handle inputs
   
for s in readable:

        if s is server:
            # A "readable" server socket is ready to accept a connection
           
connection, client_address = s.accept()
            print('new connectionfrom', client_address)
            connection.setblocking(False)
            inputs.append(connection)

            # Give the connection a queue for data we want to send
           
message_queues[connection] = queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                # A readable client socket has data
               
print
(sys.stderr, 'received"%s" from %s' % (data, s.getpeername()))
                message_queues[s].put(data)
                # Add output channel for response
               
if s not in outputs:
                    outputs.append(s)
            else:
                # Interpret empty result as closed connection
               
print
('closing', client_address, 'after readingno data')
                # Stop listening for input on the connection
               
if s in outputs:
                    outputs.remove(s)  # 既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉
               
inputs.remove(s)  # inputs中也删除掉
               
s.close()  # 把这个连接关闭掉

                # Remove message queue
               
del message_queues[s]
    # Handle outputs
   
for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except queue.Empty:
            # No messages waiting so stop checking for writability.
           
print
('output queuefor', s.getpeername(), 'is empty')
            outputs.remove(s)
        else:
            print('sending"%s" to %s' % (next_msg, s.getpeername()))
            s.send(next_msg)
    # Handle"exceptional conditions"
   
for s in exceptional:
        print('handlingexceptional condition for', s.getpeername())
        # Stop listening for input on the connection
       
inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        # Remove message queue
       
del message_queues[s]

 

客户端代码:

mport
socket
import sys

messages = ['This is the message. ',
            'It will besent ',
            'in parts.',
            ]
server_address = ('localhost', 10000)

# Create a TCP/IP socket
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM),
         socket.socket(socket.AF_INET, socket.SOCK_STREAM),
         ]

# Connect the socket to the port where the server is listening
print
>> sys.stderr, 'connecting to%s port %s' % server_address
for s in socks:
    s.connect(server_address)

for message in messages:

    # Send messageson both sockets
   
for s in socks:
        print >> sys.stderr, '%s: sending"%s"' % (s.getsockname(), message)
        s.send(message)

    # Read responseson both sockets
   
for s in socks:
        data = s.recv(1024)
        print >> sys.stderr, '%s: received"%s"' % (s.getsockname(), data)
        if not data:
            print >> sys.stderr, 'closingsocket', s.getsockname()
            s.close()

 

 

 

 

 

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/3/93029.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息