一、共享全局变量资源竞争
一个线程写入、一个线程读取没问题,如果有两个线程对某一资源同时写入时,就可能会产生资源竞争。
import threading
num = 0
def demo1(count):
global num
for i in range(count):
num += 1
print('demo1--%d' % num)
def demo2(count):
global num
for i in range(count):
num += 1
print('demo2--%d' % num)
def main():
t1 = threading.Thread(target=demo1, args=(100,))
t2 = threading.Thread(target=demo2, args=(100,))
t1.start()
t2.start()
print('main--%d' % num)
if __name__ == '__main__':
main()
打印
demo1--100
demo2--200
main--200
当参数进一步扩大时:
import threading
num = 0
def demo1(count):
global num
for i in range(count):
num += 1
print('demo1--%d' % num)
def demo2(count):
global num
for i in range(count):
num += 1
print('demo2--%d' % num)
def main():
t1 = threading.Thread(target=demo1, args=(1000000,))
t2 = threading.Thread(target=demo2, args=(1000000,))
t1.start()
t2.start()
print('main--%d' % num)
if __name__ == '__main__':
main()
打印
main--181091
demo2--1209597
demo1--1410717
解释:
num为全局变量,最开始为0,在demo1()
、demo2()
两个子线程中有for循环,在每个子线程的循环中:
(1)先获取num的值;
(2)再把获取到的num加1;
(3)并把结果保存到num。
当执行时,在一个子线程执行到第(2)步时,还未保存,此时轮转到另一个子线程,此时由于第一个子线程还未执行到第(3)步,为保存num,所以num的值为0,执行到第(2)步,也加1,此时回到第一个子线程执行第(3)步保存num,为1,再到第二个子线程执行第三步也保存,所以也为1。
上述现象是一个概率性问题,当循环次数越多时,发生的概率越大,因此循环次数为100时,未发生此现象,循环次数为1000000时,有较明显的现象。
此时即发生了资源竞争。
对执行过程进行验证(查看字节码):
import dis
def add_num(a):
a += 1
print(dis.dis(add_num))
打印
36 0 LOAD_FAST 0 (a)
2 LOAD_CONST 1 (1)
4 INPLACE_ADD
6 STORE_FAST 0 (a)
8 LOAD_CONST 0 (None)
10 RETURN_VALUE
None
说明:
(1)LOAD_FAST:加载a;
(2)LOAD_CONST:加载常量值1;
(3)INPLACE_ADD:执行add加法;
(4)STORE_FAST:赋值给a。
显然,这个过程与之前解释资源竞争时子线程执行时的过程类似。
二、互斥锁和死锁
1.互斥锁
当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制;某个线程要更改共享数据时,先将其锁定,此时资源的状态为锁定,其他线程不能改变,只到该线程释放资源,将资源的状态变成非锁定,其他的线程才能再次锁定该资源。
互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
import threading
num = 0
# 创建一个互斥锁
mutex = threading.Lock()
def demo1(count):
global num
# 加锁
mutex.acquire()
for i in range(count):
num += 1
# 解锁
mutex.release()
print('demo1--%d' % num)
def demo2(count):
global num
# 加锁
mutex.acquire()
for i in range(count):
num += 1
# 解锁
mutex.release()
print('demo2--%d' % num)
def main():
t1 = threading.Thread(target=demo1, args=(1000000,))
t2 = threading.Thread(target=demo2, args=(1000000,))
t1.start()
t2.start()
print('main--%d' % num)
if __name__ == '__main__':
main()
打印
main--166589
demo1--1000000
demo2--2000000
此时,子线程1和子线程2的打印正常,但是主线程的打印结果仍然无规律。
解释:
执行t1.start()和t2.start()后,两个子线程运行,此时会继续向下执行,主线程会打印此时num的值,由于子线程仍可能未运行结束,即num的值未完成相加到2000000,所以会最先打印main,等到子线程1执行完打印1000000,释放锁,再运行子线程2,打印2000000。
进一步改进:
import threading
import time
num = 0
# 创建一个互斥锁
mutex = threading.Lock()
def demo1(count):
global num
# 加锁
mutex.acquire()
for i in range(count):
num += 1
# 解锁
mutex.release()
print('demo1--%d' % num)
def demo2(count):
global num
# 加锁
mutex.acquire()
for i in range(count):
num += 1
# 解锁
mutex.release()
print('demo2--%d' % num)
def main():
t1 = threading.Thread(target=demo1, args=(1000000,))
t2 = threading.Thread(target=demo2, args=(1000000,))
t1.start()
t2.start()
time.sleep(2)
print('main--%d' % num)
if __name__ == '__main__':
main()
打印
demo1--1000000
demo2--2000000
main--2000000
此时,执行t1.start()
和t2.start()
后暂停2秒,足够子线程1和子线程2执行完毕,所以先打印,最后主线程打印出num最后的值2000000。
2.死锁
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。
import threading
import time
class MyThread1(threading.Thread):
def run(self):
# 对mutexA上锁
mutexA.acquire()
# mutexA上锁后,延时1秒,等待另外那个线程 把mutexB上锁
print(self.name+'----do1---up----')
time.sleep(1)
# 此时会堵塞,因为这个mutexB已经被另外的线程抢先上锁了
mutexB.acquire()
print(self.name+'----do1---down----')
mutexB.release()
# 对mutexA解锁
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
# 对mutexB上锁
mutexB.acquire()
# mutexB上锁后,延时1秒,等待另外那个线程 把mutexA上锁
print(self.name+'----do2---up----')
time.sleep(1)
# 此时会堵塞,因为这个mutexA已经被另外的线程抢先上锁了
mutexA.acquire()
print(self.name+'----do2---down----')
mutexA.release()
# 对mutexB解锁
mutexB.release()
mutexA = threading.Lock()
mutexB = threading.Lock()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
显示:
显然, 两个线程陷入相互等待的死锁,程序不能正常运行,直到手动停止程序或占满内存程序崩溃。
避免死锁:
- 程序设计时要尽量避免
- 添加超时时间
同时,创建的锁在加锁之后只能在释放之后才能再次加锁,否则也会陷入死锁等待,如下
import threading
import time
num = 0
# 创建一个互斥锁
mutex = threading.Lock()
def demo1(count):
global num
# 加锁
mutex.acquire()
mutex.acquire()
for i in range(count):
num += 1
# 解锁
mutex.release()
mutex.acquire()
print('demo1--%d' % num)
def demo2(count):
global num
# 加锁
mutex.acquire()
for i in range(count):
num += 1
# 解锁
mutex.release()
print('demo2--%d' % num)
def main():
t1 = threading.Thread(target=demo1, args=(100,))
t2 = threading.Thread(target=demo2, args=(100,))
t1.start()
t2.start()
time.sleep(2)
print('main--%d' % num)
if __name__ == '__main__':
main()
显示:
易知,主线程一直在等待子线程执行结束,但是子线程demo1()
两次枷锁、陷入死锁。
要想重用锁对象,需要使用RLock:
import threading
import time
num = 0
# 创建一个可重入锁
mutex = threading.RLock()
def demo1(count):
global num
# 加锁
mutex.acquire()
mutex.acquire()
for i in range(count):
num += 1
# 解锁
mutex.release()
mutex.release()
print('demo1--%d' % num)
def demo2(count):
global num
# 加锁
mutex.acquire()
for i in range(count):
num += 1
# 解锁
mutex.release()
print('demo2--%d' % num)
def main():
t1 = threading.Thread(target=demo1, args=(100,))
t2 = threading.Thread(target=demo2, args=(100,))
t1.start()
t2.start()
time.sleep(2)
print('main--%d' % num)
if __name__ == '__main__':
main()
显示:
死锁扩展–银行家算法
背景知识:
一个银行家如何将一定数目的资金安全地借给若干个客户,使这些客户既能借到钱完成要干的事,同时银行家又能收回全部资金而不至于破产,这就是银行家问题。这个问题同操作系统中资源分配问题十分相似:银行家就像一个操作系统,客户就像运行的进程,银行家的资金就是系统的资源。
问题描述:
一个银行家拥有一定数量的资金,有若干个客户要贷款。每个客户须在一开始就声明他所需贷款的总额。若该客户贷款总额不超过银行家的资金总数,银行家可以接收客户的要求。客户贷款是以每次一个资金单位(如1万RMB等)的方式进行的,客户在借满所需的全部单位款额之前可能会等待,但银行家须保证这种等待是有限的,可完成的。
例如:有三个客户C1,C2,C3,向银行家借款,该银行家的资金总额为10个资金单位,其中C1客户要借9各资金单位,C2客户要借3个资金单位,C3客户要借8个资金单位,总计20个资金单位。某一时刻的状态如图所示:
对于a图的状态,按照安全序列的要求,我们选的第一个客户应满足该客户所需的贷款小于等于银行家当前所剩余的钱款,可以看出只有C2客户能被满足:C2客户需1个资金单位,小银行家手中的2个资金单位,于是银行家把1个资金单位借给C2客户,使之完成工作并归还所借的3个资金单位的钱,进入b图。同理,银行家把4个资金单位借给C3客户,使其完成工作,在c图中,只剩一个客户C1,它需7个资金单位,这时银行家有8个资金单位,所以C1也能顺利借到钱并完成工作。最后(见图d)银行家收回全部10个资金单位,保证不赔本。那麽客户序列{C1,C2,C3}就是个安全序列,按照这个序列贷款,银行家才是安全的。否则的话,若在图b状态时,银行家把手中的4个资金单位借给了C1,则出现不安全状态:这时C1,C3均不能完成工作,而银行家手中又没有钱了,系统陷入僵持局面,银行家也不能收回投资。
综上所述,银行家算法是从当前状态出发,逐个按安全序列检查各客户谁能完成其工作,然后假定其完成工作且归还全部贷款,再进而检查下一个能完成工作的客户,…。如果所有客户都能完成工作,则找到一个安全序列,银行家才是安全的。
三、线程同步-Condition
背景:
小度:小爱同学;
小爱同学:我在呢;
小度:现在几点了?
小爱同学:你猜猜现在几点了
代码实现尝试:
import threading
class XiaoAi(threading.Thread):
def __init__(self):
super().__init__(name='小爱同学')
def run(self):
print('{}:我在呢'.format(self.name))
class XiaoDu(threading.Thread):
def __init__(self):
super().__init__(name='小度')
def run(self):
print('{}:小爱同学'.format(self.name))
if __name__ == '__main__':
xiaoai = XiaoAi()
xiaodu = XiaoDu()
xiaodu.start()
xiaoai.start()
打印
小度:小爱同学
小爱同学:我在呢
进一步扩展尝试:
import threading
class XiaoAi(threading.Thread):
def __init__(self):
super().__init__(name='小爱同学')
def run(self):
print('{}:我在呢'.format(self.name))
print('{}:你猜猜现在几点了'.format(self.name))
class XiaoDu(threading.Thread):
def __init__(self):
super().__init__(name='小度')
def run(self):
print('{}:小爱同学'.format(self.name))
print('{}:现在几点了?'.format(self.name))
if __name__ == '__main__':
xiaoai = XiaoAi()
xiaodu = XiaoDu()
xiaodu.start()
xiaoai.start()
打印
小度:小爱同学
小度:现在几点了?
小爱同学:我在呢
小爱同学:你猜猜现在几点了
显然,没有达到我们想要的效果。
加锁再次尝试:
import threading
class XiaoAi(threading.Thread):
def __init__(self,lock):
super().__init__(name='小爱同学')
self.lock = lock
def run(self):
self.lock.acquire()
print('{}:我在呢'.format(self.name))
self.lock.release()
self.lock.acquire()
print('{}:你猜猜现在几点了'.format(self.name))
self.lock.release()
class XiaoDu(threading.Thread):
def __init__(self,lock):
super().__init__(name='小度')
self.lock = lock
def run(self):
self.lock.acquire()
print('{}:小爱同学'.format(self.name))
self.lock.release()
self.lock.acquire()
print('{}:现在几点了?'.format(self.name))
self.lock.release()
if __name__ == '__main__':
mutex = threading.Lock()
xiaoai = XiaoAi(mutex)
xiaodu = XiaoDu(mutex)
xiaodu.start()
xiaoai.start()
打印
小度:小爱同学
小度:现在几点了?
小爱同学:我在呢
小爱同学:你猜猜现在几点了
结果和之前一样,还是没有达到想要的效果,用锁是达不到效果的。
用线程同步尝试:
Condition类中有两个方法:
__enter__()
调用的是Lock的acquire()
方法;
__exit__()
调用的是Lock的release()
方法。
由此可知,Condition对象可以用上下文处理器处理。
import threading
class XiaoAi(threading.Thread):
def __init__(self,cond):
super().__init__(name='小爱同学')
self.cond = cond
def run(self):
with self.cond:
self.cond.wait()
print('{}:我在呢'.format(self.name))
self.cond.notify()
self.cond.wait()
print('{}:你猜猜现在几点了'.format(self.name))
self.cond.notify()
class XiaoDu(threading.Thread):
def __init__(self,cond):
super().__init__(name='小度')
self.cond = cond
def run(self):
self.cond.acquire()
print('{}:小爱同学'.format(self.name))
self.cond.notify()
self.cond.wait()
print('{}:现在几点了?'.format(self.name))
self.cond.notify()
self.cond.wait()
self.cond.release()
if __name__ == '__main__':
cond = threading.Condition()
xiaoai = XiaoAi(cond)
xiaodu = XiaoDu(cond)
xiaodu.start()
xiaoai.start()
显示:
由于Condition类实现了__enter__()
和方法,所以可以用上下文处理器进行处理,即用with打开、关闭对象,这和self.cond.acquire()
和self.cond.release()
的效果是一样的。
显然,程序执行阻塞,这是因为子线程的调用顺序有问题,进行调试:
import threading
class XiaoAi(threading.Thread):
def __init__(self,cond):
super().__init__(name='小爱同学')
self.cond = cond
def run(self):
with self.cond:
print(4)
self.cond.wait()
print(5)
print('{}:我在呢'.format(self.name))
self.cond.notify()
self.cond.wait()
print('{}:你猜猜现在几点了'.format(self.name))
self.cond.notify()
class XiaoDu(threading.Thread):
def __init__(self,cond):
super().__init__(name='小度')
self.cond = cond
def run(self):
self.cond.acquire()
print('{}:小爱同学'.format(self.name))
print(1)
self.cond.notify()
print(2)
self.cond.wait()
print(3)
print('{}:现在几点了?'.format(self.name))
self.cond.notify()
self.cond.wait()
self.cond.release()
if __name__ == '__main__':
cond = threading.Condition()
xiaoai = XiaoAi(cond)
xiaodu = XiaoDu(cond)
xiaodu.start()
xiaoai.start()
显示:
易知,执行顺序是:
先执行XiaoDu类的print('{}:小爱同学'.format(self.name))
和cond.notify()
,xiaodu一直cond.wait()
,然后执行XiaoAi的cond.acquire()
,也一直cond.wait()
,两者均在等待,陷入死锁。
交换xiaodu和xiaoai的线程创建和执行顺序即可:
import threading
class XiaoAi(threading.Thread):
def __init__(self,cond):
super().__init__(name='小爱同学')
self.cond = cond
def run(self):
with self.cond:
self.cond.wait()
print('{}:我在呢'.format(self.name))
self.cond.notify()
self.cond.wait()
print('{}:你猜猜现在几点了'.format(self.name))
self.cond.notify()
class XiaoDu(threading.Thread):
def __init__(self,cond):
super().__init__(name='小度')
self.cond = cond
def run(self):
self.cond.acquire()
print('{}:小爱同学'.format(self.name))
self.cond.notify()
self.cond.wait()
print('{}:现在几点了?'.format(self.name))
self.cond.notify()
self.cond.wait()
self.cond.release()
if __name__ == '__main__':
cond = threading.Condition()
xiaoai = XiaoAi(cond)
xiaodu = XiaoDu(cond)
xiaoai.start()
xiaodu.start()
打印
小度:小爱同学
小爱同学:我在呢
小度:现在几点了?
小爱同学:你猜猜现在几点了
三个子线程的尝试:
import threading
class XiaoAi(threading.Thread):
def __init__(self,cond):
super().__init__(name='小爱同学')
self.cond = cond
def run(self):
with self.cond:
self.cond.wait()
print('{}:我在呢'.format(self.name))
self.cond.notify()
self.cond.wait()
print('{}:你猜猜现在几点了'.format(self.name))
self.cond.notify()
class XiaoDu(threading.Thread):
def __init__(self,cond):
super().__init__(name='小度')
self.cond = cond
def run(self):
self.cond.acquire()
print('{}:小爱同学,天猫精灵'.format(self.name))
self.cond.notify()
self.cond.wait()
print('{}:现在几点了?'.format(self.name))
self.cond.notify()
self.cond.wait()
self.cond.release()
class Jingling(threading.Thread):
def __init__(self,cond):
super().__init__(name='天猫精灵')
self.cond = cond
def run(self):
with self.cond:
self.cond.wait()
print('{}:我也在呢'.format(self.name))
self.cond.notify()
self.cond.wait()
print('{}:你猜猜现在几点了'.format(self.name))
self.cond.notify()
if __name__ == '__main__':
cond = threading.Condition()
xiaoai = XiaoAi(cond)
xiaodu = XiaoDu(cond)
jingling = Jingling(cond)
xiaoai.start()
jingling.start()
xiaodu.start()
打印
小度:小爱同学,天猫精灵
小爱同学:我在呢
天猫精灵:我也在呢
小度:现在几点了?
小爱同学:你猜猜现在几点了
天猫精灵:你猜猜现在几点了
四、多任务版UDP聊天
实现思路:
- 创建套接字
- 绑定本地信息
- 获取对方IP和端口
- 发送、接收数据
- 创建两个线程,并执行功能
NetAssist配置如下:
代码实现:
import socket
import threading
def recv_msg(udp_socket):
'''接收数据'''
while True:
recv_data = udp_socket.recvfrom(1024)
print(recv_data[0].decode('gbk'))
def send_msg(udp_socket, dest_ip, dest_port):
'''发送数据'''
while True:
send_data = input('Inoput data to send:')
udp_socket.sendto(send_data.encode('gbk'), (dest_ip, dest_port))
def main():
#创建套接字
udp_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
#绑定
udp_socket.bind(('',7890))
# 获取对方的IP和端口
dest_ip = input('Input IP:')
dest_port = int(input('Input Port:'))
t_recv = threading.Thread(target=recv_msg, args=(udp_socket, ))
t_send = threading.Thread(target=send_msg, args=(udp_socket, dest_ip, dest_port))
t_recv.start()
t_send.start()
if __name__ == '__main__':
main()
演示效果:
五、进程介绍
1.概念辨析
- 进程:
正在运行的代码+用到的资源; - 程序:
没有执行的代码,是静态的。
2.进程的状态
主要包括就绪、运行、等待三个状态,是一个循环往复的过程。
3.使用进程实现多任务
multiprocessing模块就是跨平台的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情。
线程测试:
import threading
import time
def demo1():
for _ in range(5):
print('--1--')
time.sleep(1)
def demo2():
for _ in range(5):
print('--2--')
time.sleep(1)
def main():
t1 = threading.Thread(target=demo1)
t2 = threading.Thread(target=demo2)
t1.start()
t2.start()
if __name__ == '__main__':
main()
打印:
--1--
--2--
--1--
--2--
--1--
--2--
--1--
--2--
--1--
--2--
改为进程:
import multiprocessing
import time
def demo1():
for _ in range(5):
print('--1--')
time.sleep(1)
def demo2():
for _ in range(5):
print('--2--')
time.sleep(1)
def main():
p1 = multiprocessing.Process(target=demo1)
p2 = multiprocessing.Process(target=demo2)
p1.start()
p2.start()
if __name__ == '__main__':
main()
打印:
--1--
--2--
--1--
--2--
--1--
--2--
--1--
--2--
--1--
--2--
显然,进程和线程的代码实现是相似的,结果也是很近似的。
但是线程和进程是有很大不同的:
观察任务管理器:
是线程时,
此时只有一个Python进程。
是进程时,
此时有两个Python进程,分别代表代码中的两个子进程。
一个主进程和两个子进程,子进程将主进程的代码复制,再执行,会造成资源浪费,但是会比单进程效率要快。
在Ubuntu中运行:
import os
import time
#创建子线程
pid = os.fork()
print('Hello World')
#判断是子线程
if pid == 0:
print('s_fork:{},f_fork:{}'.format(os.getpid(),os.getppid()))
else:
print('f_fork:{}'.format(os.getpid()))
执行(Windows运行会报错)结果为
Hello World
f_fork:2180
Hello World
s_fork:2181,f_fork:2180
打印了两次Corley。
解释:
os.fork()
复制父进程代码创建了一个子进程,父进程和子进程分别执行一次,先后打印出Hello World,并分别进入if判断,两次打印id。
转载:https://blog.csdn.net/CUFEECR/article/details/104174012