线程
多任务:操作系统可以同时运行多个任务。
python 默认是单任务
线程: 被称为轻量级进程,是程序执行流的最小单元。一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。另外,线程是进程中的一个实体,是CPU调度和分派的基本单位,线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其他线程共享进程所拥有的全部资源
主线程:当⼀个程序启动时,就有⼀个进程被操作系统(OS)创建,与此同时⼀个线程也⽴刻运 ⾏,该线程通常叫做程序的主线程
主线程的重要性:
1)是产生其他子线程的线程
2)通常它必须最后完成执行各种关闭动作
子线程:可以看做是程序执⾏的⼀条分⽀,当⼦线程启动后会和主线程⼀起同时执⾏
使用threading .Thread(target=函数名,args=(参数列表,元组))创建子线程对象
import time
import threading
def sing():
for i in range(5):
print("正在唱歌 .....")
time.sleep(0.5)
def dance():
for i in range(5):
print("正在跳舞 .....")
time.sleep(0.5)
if __name__ == '__main__':
# 如果不使用线程,只能5次唱歌后才能进行5次跳舞,不能同时唱歌跳舞,而且速度慢
# sing()
# dance()
# 使用threading .Thread(target=函数名,args=(参数列表,元组))创建子线程对象
# 线程 传递参数有 三种方式
# 1.元组 threading.thread(target = 函数名,args=(参数1,参数2,......))元组中元素的顺序和函数的参数顺序一致
# 2.字典 threading.thread(target = 函数名,kwargs={“参数名”:"参数值",.....})
# 3.混合元组和字典 threading.thread(target = 函数名,args=(参数1,参数2,......),kwargs={“参数名”:"参数值",.....})
sing_thread = threading.Thread(target=sing)
dance_thread = threading.Thread(target=dance)
# 线程对象.start()启动子线程(子线程同时执行,这样就可以同时唱歌跳舞,但是多线程程序的执⾏顺序是不确定的)
sing_thread.start()
dance_thread.start()
# 使用多线程并发的操作,花费时间要短很多
除了使用threading.Thread创建线程,也可以通过自定义一个类继承threading.Thread类,并且重写run()方法,然后通过实例化自定义对象.start()方法启动自定义线程。调⽤start() ⽅法,但是对象的run⽅法被执⾏了,说明 start⽅法中调⽤了 run⽅法。其中的__init__方法需要调用父类的init方法,super().init()
查看线程数量:
threading.enumerate() 获取当前所有活跃线程的列表。使用len()对列表求长度可以看到当前活跃的线程的个数。
获取当前线程对象:
threading.current_thread() 获取当前线程对象(含名称)
守护线程:如果在程序中将⼦线程设置为守护线程,则该⼦线程会在主线程结束时⾃动退出,设 置⽅式为thread.setDaemon(True),要在thread.start()之前设置。默认是false的,也就是主线程结束 时,⼦线程依然在执⾏。
# 导入时间和线程模块
from time import sleep
import threading
def work1():
for i in range(10):
print("正在执行.....", i)
sleep(0.5)
if __name__ == '__main__':
# 创建子线程
t1 = threading.Thread(target=work1)
# ****设置子线程为守护线程,如果不设置这个,即使主线程exit,子线程也会继续执行****
t1.setDaemon(True)
# 启动子线程
t1.start()
sleep(2)
print("Game Over .....")
exit()
并发:
指的是任务数多于cpu核数,通过操作系统的各种任务调度算法,实现⽤多个任务“⼀起”执 ⾏(实际上总有⼀些任务不在执⾏,因为切换任务的速度相当快,看上去⼀起执⾏⽽已)
真正的并⾏执⾏多任务只能在多核CPU上实现,但是,由于任务数量远远多 于CPU的核⼼数量, 所以,操作系统也会⾃动把很多任务轮流调度到每个核 ⼼上执⾏。
并行:
指的是任务数⼩于等于cpu核数,即任务真的是⼀起执⾏的
多线程 共享 全局变量数据
产生的问题:
线程对全局变量随意修改可能造成多线程之间对全局变量的混乱(即线程⾮安全)
如果多个线程同时对同一个全局变量操作,会造成资源竞争的问题,导致计算结果有误。
解决方法:
优先让某个线程先执行,使用线程对象.join()方法解决,缺点:把多线程变成了单线程,影响了整体性能
如下:
import threading
from time import sleep
# 定义全局变量
num = 0
def work1():
global num
for i in range(1000000):
num += 1
print("work1 num = %d" % num) #1000000
def work2():
global num
for i in range(1000000):
num += 1
print("work2 num= %d" % num) #2000000
if __name__ == '__main__':
work1_thread = threading.Thread(target=work1)
work2_thread = threading.Thread(target=work2)
work1_thread.start()
# work1执行完后再执行work2
work1_thread.join()
work2_thread.start()
# while 循环的作⽤是:能保证⼦线程运⾏完毕,再去输出 num
while len(threading.enumerate()) != 1:
sleep(2)
print("num = ",num) # 2000000
同步:
在多任务中 多个任务执行有先后顺序,一个执行完毕后,另外一个再执行(这个可以解决线程同时修改全局变量)
异步:
在多任务中 多个任务执行没有先后顺序,多个任务同时执行
线程的锁机制:
当线程获取资源后,立刻进行锁定,资源使用完毕后再解锁,有效的保证同一时间只有一个线程在使用资源
互斥锁:
当多个线程几乎同时修改某一个共享数据的时候,需要进行共同控制。线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。
# 创建锁
mutex= threading.Lock()
# 锁定
mutex.acquire()
# 释放
mutex.release()
死锁:
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁
进程
进程: 是资源分配的最小单位,也是线程的容器。
程序:程序时固定不变的,而进程会根据运行需要,让操作系统动态分配各种资源(如内存,cpu,磁盘,网络)
进程的状态:
首先导包,import multiprocessing,使用multiprocessing.Process(target=函数名)创建进程对象,使用.start()方法启动进程,当然除了target这个参数,还可以用args元组方式和kwargs字典方式传参
获取当前进程对象名称:multiprocessing.current_process()
设置名称:multiprocessing.Process(target=xxx,name="进程名称")
获取进程号id:multiprocessing.current_process().pid / os.getpid()
获取进程父id:os.getppid()
进程之间 不能共享 全局变量
底层原理:子进程会复制主进程的资源到内部运行
给子进程传递参数方法和给子线程传递参数方法基本一致!
守护进程:p1.daemon=True 设置⼦进程 p1 守护主进程,当主进程结束的时候,⼦进程也随之结束。默认情况下,主进程会等待子进程结束才结束。
p1.terminate() 终⽌进程执⾏,并⾮是守护进程
进程间的通信:
消息队列
====================创建队列,队列的操作写入和读取,获取队列中消息的个数,判断队列是否为空或者已满======================
# 导入模块
import multiprocessing
# 定义消息队列,如果不指定队列长度,则默认为最大,
# 如果指定了消息队列的大小,则消息队列就有上限控制
queue = multiprocessing.Queue(4)
# queue.put()向消息队列中放入任意类型的值
queue.put(1)
queue.put("hello")
queue.put([1, 2, 3])
queue.put({
"name": "wenmei", "age": 18})
# 当超出队列长度时,进入阻塞状态,直到从消息队列腾出空间为止
#queue.put((1, 2, 3))
# 消息队列如果没有空间可写入,则会立刻抛出"Queue.Full"异常
queue.put_nowait((1, 2, 3))
# 判断队列是否已满
isFull = queue.full()
print("isFull -------->",isFull)
print(queue) # <multiprocessing.queues.Queue object at 0x0000021D09E957F0>
# queue.get 获取第一个值
value1 = queue.get()
print(value1)
# queue.get 获取第二个值
value2 = queue.get()
print(value2)
........
value5 = queue.get_nowait()
print(value5)
# 获取队列中消息的个数
print("消息的个数:",queue.qsize())
# 判断队列是否为空
isEmpty = queue.empty()
print("isEmpty------>", isEmpty)
"""
思路:利用队列在两个进程间进行传递,进而实现数据共享
1.准备两个进程
2.准备一个队列,一个进程向队列写入数据,另一进程从队列读数据
"""
#导入模块
import multiprocessing
from time import sleep
# 写入数据到队列的函数
def wirte_queue(queue):
for i in range(10):
# 判断是否已满
if queue.full():
print("队列已满!")
break
queue.put(i)
print("写入成功,已经写入:" ,i)
sleep(0.5)
# 读取队列数据并显示的函数
def read_queue(queue):
while True:
if queue.empty():
print("队列为空!")
break
value = queue.get()
print("已经读取:", value)
if __name__ == '__main__':
# 创建一个空的队列
create_queue = multiprocessing.Queue(5)
# 创建2个进程,分别读写
wirte_process = multiprocessing.Process(target=wirte_queue,args=(create_queue,))
read_process = multiprocessing.Process(target=read_queue,args=(create_queue,))
wirte_process.start()
# 优先写数据,结束后再读取数据
wirte_process.join()
read_process.start()
进程池:当需要创建的⼦进程数量不多时,可以直接利⽤multiprocessing中的Process动态成⽣多个进程,但如 果是上百甚⾄上千个⽬标,⼿动的去创建进程的⼯作量巨⼤,此时就可以⽤到multiprocessing模块提 供的Pool⽅法。
同步方式: pool.apply(),阻塞方式,也就是说,必须等待上一个进程退出才能执行下一个进程
异步方式: pool.apply_async(),非阻塞方式,并行,后面必须有pool.close() pool.join()
=================进程池的创建 multiprocessing.Pool(3) 以及两种工作方式=============================
# 导入相应的模块
import multiprocessing
from time import sleep
# 创建一个函数,用于模拟文件拷贝
def copy_work():
print("正在拷贝......", multiprocessing.current_process())
sleep(0.5)
if __name__ == '__main__':
# 创建一个进程池,长度为3,表示进程池中最多能够创建3个进程
propool = multiprocessing.Pool(3)
for i in range(10):
# 原始方法
# copy_work()# <_MainProcess(MainProcess, started)>
# 先用进程池同步方式拷贝, .apply(函数名,(参数1,参数2....))
# propool.apply(copy_work) #一个一个出现<SpawnProcess(SpawnPoolWorker-2, started daemon)>
# 再用进程池异步方式拷贝
# 注意:1.pool.close()表示不再接受新的任务
# 2.主进程不再等待进程池结束后再退出,需要进程池join()
propool.apply_async(copy_work)# 如果没有使用注意的两点,不会输出任何值三个三个出现
propool.close()
propool.join()# 让主进程等待进程池执行后再退出
============================文件夹拷贝器进程池实现=============================
import os
import multiprocessing
# 文件拷贝函数:
def copy_file(source_path,target_path,file_name):
# 拼接源文件和目标文件的具体路径
print(multiprocessing.current_process())
source_path = source_path + "/" + file_name
target_path = target_path + "/" + file_name
# 打开源文件读取源文件的内容,写入到目标文件中
with open(source_path, "rb") as source_file:
with open(target_path, "wb") as target_file:
while True:
file_data = source_file.read(1024)
if file_data:
target_file.write(file_data)
else:
break
if __name__ == '__main__':
# 定义变量,保存源文件夹、目标文件夹所在路径
source_path = "D:/workspace/mycharm/learn_process/test"
target_path = "D:/test"
# 创建目标路径
try:
os.mkdir(target_path)
except Exception as e:
print("文件夹已经存在!")
# 获取源文件夹中的所有文件(列表)
file_list = os.listdir(source_path)
# 创建进程池
propool = multiprocessing.Pool(3)
for file_name in file_list:
# copy_file(source_path,target_path,file_name) #以前的方式,直接调用函数
propool.apply_async(copy_file,(source_path, target_path, file_name))
propool.close()
propool.join()
"""进程池中的queue通信,与上面的进程间直接通信相比 是在进程池间进程的通信,创建队列是有区别的"""
=======如果要使⽤Pool创建进程,就需要使⽤multiprocessing.Manager()中的Queue(),⽽不是multiprocessing.Queue()===============
#导入模块
import multiprocessing
from time import sleep
# 写入数据到队列的函数
def wirte_queue(queue):
for i in range(10):
# 判断是否已满
if queue.full():
print("队列已满!")
break
queue.put(i)
print("写入成功,已经写入:" ,i)
sleep(0.5)
# 读取队列数据并显示的函数
def read_queue(queue):
while True:
if queue.empty(): # if queue.qsize() == 0:
print("队列为空!")
break
value = queue.get()
print("已经读取:", value)
if __name__ == '__main__':
# 创建进程池
propool = multiprocessing.Pool(3)
# 创建 ====进程池中====的队列======================================
queuepool = multiprocessing.Manager().Queue(5)
# 使用进程池执行任务
# 同步方式
# propool.apply(wirte_queue, (queuepool,))
# propool.apply(read_queue, (queuepool,))
# 异步方式,apply_async()返回值ApplyResult对象,该对象有一个wait()方法
# wait()方法类似join()表示先让进程执行完毕,后续进程才能启动
result = propool.apply_async(wirte_queue,(queuepool,))
result.wait()# 如果没有这个 只能读第一个
propool.apply_async(read_queue,(queuepool,))
propool.close()# 表示不再接收新的任务
propool.join() # 主进程会等待进程池执行结束后再退出
进程和线程对比
区别:
进程是系统进行资源分配的基本单位,线程是进程的一个实体,是CPU调度的基本单位,它是比进程更小的能独立运行的基本单位。
一个程序至少有一个进程,一个进程至少有一个线程
进程在执行过程中拥有独立的内存单元,而多个线程共享内存,只需要到一些必不可少的一点资源,从而极大地提高了程序的运行效率
线程不能独立执行,必须依存在进程中
进程切换慢,线程切换快
选择原则:
- 需要频繁创建销毁的优先使⽤线程;因为对进程来说创建和销毁⼀个进程代价是很⼤的。
- 线程的切换速度快,所以在需要⼤量计算,切换频繁时⽤线程,还有耗时的操作使⽤线程可提⾼应⽤程序的响应
- 因为对CPU系统的效率使⽤上线程更占优,所以可能要发展到多机分布的⽤进程,多核分布⽤线 程
- 并⾏操作时使⽤线程,如C/S架构的服务器端并发线程响应⽤户的请求;
- 需要更稳定安全时,适合选择进程;需要速度时,选择线程更好。
- CPU 密集型 进程优先;I/O 密集型使用线程
在python的原始解释器CPython中存在着GIL(Global Interpreter Lock,全局解释器锁),因此在解释执行python代码时,会产生互斥锁来限制线程对共享资源的访问,直到解释器遇到I/O操作或者操作次数达到一定数目时才会释放GIL。造成了即使在多核CPU中,多线程也只是做着分时切换而已。
协程
可迭代对象 && 迭代器
我们已经知道可以对list、tuple、str、dict、set等类型的数据使⽤for…in…的循环语法从其中依次拿到数据进⾏使⽤,我们把这样的过程称为遍历,也叫迭代。
如何判断一个对象是否是Iterable对象?:使用**isinstance()**判断一个对象是否是Iterable对象,导包:from collections import Iterable
可迭代对象的本质:一个对象所属的类中含有__iter__()方法,该对象就是一个可迭代对象。
迭代器的作用:1)记录当前迭代的位置 2)配合next()获取可迭代对象的下一个元素值。
获取迭代器: iter(可迭代对象)
获取可迭代对象的值: next(迭代器)
for循环的本质:
1)通过iter(要遍历的对象)获取要遍历的对象的迭代器
2)next(迭代器)获取下一个元素
3)帮我们捕获了StopIteration异常
自定义迭代器,满足2类:
1)必须含有__iter__()
2) 必须含有__next__()
迭代器⾃身正是⼀个迭代器,所以迭代器的 iter ⽅法返回⾃身即可。
=====================自定义列表,进行遍历,利用for循环的本质=============================
# 定义MyList类 1.初始化方法2.__iter__()方法,对外提供迭代器
# 3.addItem()方法,用来添加数据
class MyList(object):
def __init__(self):
# 定义实例属性,保存数据
self.items = []
def __iter__(self):
# 创建MyListIterator对象 并返回
mylist_iterator = MyListIterator(self.items)
return mylist_iterator
def addItem(self, data):
self.items.append(data)
print(self.items)
# 自定义迭代器类MyListIterator
# 初始化方法、迭代器方法__iter__()、获取下一个元素值的方法__next__()
class MyListIterator(object):
def __init__(self, items):
# 定义实例属性,保存MyList类传递过来的items
self.items = items
# 记录迭代器迭代的位置
self.current_index = 0
def __iter__(self):
pass
def __next__(self):
# 判断当前的下标是否越界,如果越界,直接抛出异常
if self.current_index < len(self.items):
data = self.items[self.current_index]
self.current_index += 1
return data
else:
raise StopIteration
if __name__ == '__main__':
mylist = MyList()
mylist.addItem("星期一")
mylist.addItem("星期二")
mylist.addItem("星期三")
for value in mylist:
print(value)
=============================迭代器-------斐波拉契数列(Fibonacci)============================
class Fibonacci(object):
def __init__(self, nums):
# 指定生成几个数
self.num = nums
# 记录下标位置的实例属性
self.current_index = 0
# 保存斐波那契数列的前两个
self.a = 1
self.b = 1
def __iter__(self):
return self
def __next__(self):
# 判断生成的个数是否超出范围
if self.current_index < self.num:
# 定义变量,保存a的值
data = self.a
self.a, self.b = self.b, self.a + self.b
# 索引+1(一定要记得)
self.current_index += 1
return data
else:
raise StopIteration
if __name__ == '__main__':
fib = Fibonacci(5)
for value in fib:
print(value, end="") # 1 1 2 3 5
# 除了for循环能接收可迭代对象,list、tuple等也能接收。
fib_list = list(Fibonacci(5))
print(fib_list) #[1, 1, 2, 3, 5]
fib_tuple = tuple(Fibonacci(5))
print(fib_tuple) #(1, 1, 2, 3, 5)
生成器
利⽤迭代器,我们可以在每次迭代获取数据(通过next()⽅法)时按照特定的规律进⾏⽣成。但是我们 在实现⼀个迭代器时,关于当前迭代到的状态需要我们⾃⼰记录,进⽽才能根据当前状态⽣成下⼀个 数据。为了达到记录当前状态,并配合next()函数进⾏迭代使⽤,我们可以采⽤更简便的语法,即⽣成 器(generator)。⽣成器是⼀类特殊的迭代器( 按照一定的规律生成数列)。
=================================创建生成器两种方法===============================
"""
生成器创建的两种方式:
1.列表推导式 2.函数中使用yield
"""
# 列表推导式
data_list1 = [x * 2 for x in range(10)]
print(data_list1) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
# 生成器创建方式一,将[]改为 ()
data_generator = (x * 2 for x in range(10))
print(data_generator) # <generator object <genexpr> at 0x00000224F611F6D8>
#对于列表可以直接打印每一个元素,但是对于生成器,应该按照迭代器的使用方法来使用,如next(),for循环、list()等方法
for value in data_generator:
print(value, end=" ") # 0 2 4 6 8 10 12 14 16 18
def test1():
return 10
m = test1()
print("\n m=", m) # 10
# 生成器创建方式二,函数中使用yield
def test2():
yield 10
t = test2()
print(t) # <generator object test2 at 0x000001DFEE2C5570>
value = next(t)
print(value) # 10
=============================生成器实现 ------斐波那契数列=====================================
# 我们将原本在迭代器 __next__ ⽅法中实现的基本逻辑放到⼀个函数中来实现,但是将每次迭代返回数值的return换成了yield,
# 此时新定义的函数便不再是函数,⽽是⼀个⽣成器了。简单来说:只要在def中有yield关键字的 就称为 ⽣成器
def fibonacci(n):
# 定义变量保存第一列和第二列的值
a = 1
b = 1
# 定义变量保存当前生成的位置
current_index = 0
# 循环生成数据,条件(当前的列数<总列数)
print("-------------1111111111111111---------------")
while current_index < n:
data = a
a, b = b, a+b
current_index += 1
print("-------------22222222222222---------------")
yield data
print("-------------33333333333333---------------")
#yield作用 :
# 1.能够充当return的作用
# 2.能够保存程序的状态,并且暂停程序执行
# 3.当next的时候,可以继续唤醒程序从yield位置继续向下执行
if __name__ == '__main__':
fib = fibonacci(5)
# value = next(fib)
# print("第一列:", value)
# value = next(fib) #next函数让⽣成器从断点处继续执⾏,即唤醒⽣成器(函数)
# print("第二列:", value)
for f in fib:
print(f)
输出:
-------------1111111111111111---------------
-------------22222222222222---------------
1
-------------33333333333333---------------
-------------22222222222222---------------
1
-------------33333333333333---------------
-------------22222222222222---------------
2
-------------33333333333333---------------
-------------22222222222222---------------
3
-------------33333333333333---------------
-------------22222222222222---------------
5
-------------33333333333333---------------
生成器-使用注意:
return 作用:可以结束 生成器的运行
send 作用:生成器.send(传递给生成器的值),出来next()可以唤醒生成器外,也可以使用send()函数唤醒,同时还可以传递一个参数,c.next()等价c.send(None)
=====================================生成器-使用注意==========================
def fibonacci(n):
# 定义变量保存第一列和第二列的值
a = 1
b = 1
# 定义变量保存当前生成的位置
current_index = 0
# 循环生成数据,条件(当前的列数<总列数)
print("-------------1111111111111111---------------")
while current_index < n:
data = a
a, b = b, a+b
current_index += 1
print("-------------22222222222222---------------")
recv = yield data
if recv == 1:
return " 我是return,我能让生成器结束!"
print("-------------33333333333333---------------")
#yield作用 :
# 1.能够充当return的作用
# 2.能够保存程序的状态,并且暂停程序执行
# 3.当next的时候,可以继续唤醒程序从yield位置继续向下执行
if __name__ == '__main__':
fib = fibonacci(5)
value = next(fib)
print("第一列:", value)
value = next(fib)
try:
print("第二列:", value)
data = fib.send(1)
# 后面的没有执行,程序终止
value = next(fib)
print("第三列:", value)
except Exception as e:
print(e) # 我是return,我能让生成器结束! 捕获异常
协程
协程,⼜称微线程, 就是你可以暂停执⾏的函数,就像生成器一样。它在不开辟新的线程的基础上,实现多个任务。
线程和进程的操作是由程序触发系统接口,最后的执⾏者是系统; 协程的操作则是程序员。线程的切换⾮常耗性能。但是协程的切换只是单纯的操作CPU的上下⽂,所以⼀秒钟切换个 上百万次系统都抗的住。
协程存在的意义:对于多线程应⽤,CPU通过切⽚的⽅式来切换线程间的执⾏,线程切换时需要 耗时(保存状态,下次继续)。协程,则只使⽤⼀个线程(单线程),在⼀个线程中规定某个代码块 执⾏顺序。
协程的适⽤场景: 当程序中存在⼤量不需要CPU的操作时(IO),适⽤于协程;
协程–yield
from time import sleep
def work1():
while True:
print("正在执行work1.....")
yield
sleep(0.5)
def work2():
while True:
print("正在执行work2.................")
yield
sleep(0.5)
if __name__ == '__main__':
w1 = work1()
w2 = work2()
while True:
next(w1)
next(w2)
协程-----greenlet
greenlet 是python的⼀个C扩展,一个第三方模块,提供可自行调度的微线程,即协程。⽤switch来表示 协程的切换
from time import sleep
from greenlet import greenlet
def work1():
while True:
print("正在执行work1.....")
sleep(0.5)
# 切换到第二个任务
g2.switch()
def work2():
while True:
print("正在执行work2.................")
sleep(0.5)
g1.switch()
if __name__ == '__main__':
# 创建greenlet对象
# greenlet(函数名)
g1 = greenlet(work1)
g2 = greenlet(work2)
# 执行work1任务,并没有使用while True,只是定义先执行哪一个
g1.switch()
协程------gevent
其原理是当⼀个greenlet遇到IO(指的是input output 输⼊输出,⽐如⽹络、⽂件操作等)操作时, ⽐如访问⽹络,就 ⾃动切换 到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执⾏。使用 spawn() 指派任务
from gevent import monkey
monkey.patch_all()
from time import sleep
import gevent
def work1():
while True:
print("正在执行work1.....", gevent.getcurrent())# # <Greenlet at 0x1e07b61f268: work1>
sleep(0.5) # 默认情况下,time.sleep()不能被gevent识别为耗时操作
# 1)把time.sleep()----->gevent.sleep()
#gevent.sleep(0.5)
# 2)给gevent打补丁(目的;让gevent识别time.sleep())
"""
如何打补丁?
1.导入monkey模块 from gevent import monkey
2.破解 monkey.patch_all()
3.以上默认放在开头才起作用
"""
def work2():
while True:
print("正在执行work2.................",gevent.getcurrent()) # <Greenlet at 0x1e07b61f378: work2>
sleep(0.5)
#gevent.sleep(0.5)
if __name__ == '__main__':
# 指派任务 gevent.spawn(函数名,参数1,参数2,,,,)
g1 = gevent.spawn(work1)
g2 = gevent.spawn(work2)
# 让主线程等待协程执行完毕后再退出
g1.join()
g2.join()
进程、线程、协程对比
进程是资源分配的基本单位、线程是CPU调度的基本单位、协程是单线程执行多任务
切换效率:协程 > 线程 > 进程
高效率方式: 进程+ 协程
应用场景:
多进程:密集CPU任务,需要充分使⽤多核CPU资源(服务器,⼤量的并⾏计算)的时候,⽤多进 程。
缺陷:多个进程之间通信成本⾼,切换开销⼤。多线程:密集I/O任务(⽹络I/O,磁盘I/O,数据库I/O)使⽤多线程合适。
缺陷:同⼀个时间切⽚只能运⾏⼀个线程,不能做到⾼并⾏,但是可以做到⾼并发。协程:当程序中存在⼤量不需要CPU的操作时(IO),适⽤于协程;多线程请求返回是⽆序的,哪个线程有数据返回就处理哪个线程,⽽协程返回的数据是有序的。
缺陷:单线程执⾏,处理密集CPU和本地磁盘IO的时候,性能较低。处理⽹络I/O性能还是⽐较⾼.
==========================================协程实现文件下载===================================
from gevent import monkey
monkey.patch_all()
import urllib.request
import gevent
def download_img(imgurl, file_name):
try:
# 根据url地址请求网络资源
url_data = urllib.request.urlopen(imgurl)# 打开⽹址并返回对应的内容(⼆进制流)
# 在本地创建文件,并保存
with open(file_name, "wb") as file:
# 一直读取数据 直到为空
while True:
# 读取网络资源数据(循环)
file_data = url_data.read(1024)
# 把读取的网络资源写入到本地文件中
if file_data:
file.write(file_data)
else:
break
# 做异常捕获
except Exception as e:
print("文件 %s 下载失败!" % file_name)
def main():
# 定义要下载的图片路径
img_url1 = "http://img.mp.itc.cn/upload/20170716/8e1b835f198242caa85034f6391bc27f.jpg"
img_url2 = "http://img.mp.sohu.com/upload/20170529/d988a3d940ce40fa98ebb7fd9d822fe2.png"
img_url3 = "http://image.uczzd.cn/11867042470350090334.gif?id=0&from=export"
# 调用文件下载函数,专门下载文件
# download_img(img_url1, "1.gif")
# download_img(img_url2, "2.gif")
# download_img(img_url3, "3.gif")
# gevent.joinall([协程列表])
gevent.joinall([
gevent.spawn(download_img, img_url1, "1.gif"),
gevent.spawn(download_img, img_url2, "2.gif"),
gevent.spawn(download_img, img_url3, "3.gif")
])
if __name__ == '__main__':
main()
GIL(Global Interpreter Lock(全局解释器锁))
定义:
任何Python线程执⾏前,必须先获得GIL锁,然后,每执⾏100条字节码,解释器就⾃动释放GIL 锁,让别的线程有机会执⾏。这个GIL全局锁实际上把所有线程的执⾏代码都给上了锁,所以, 多线程在Python中只能交替执⾏,即使100个线程跑在100核CPU上,也只能⽤到1个核。
GIL不是Python的特性
GIL是 Python解释器(Cpython) 时引⼊的概念,在JPython、PyPy中没有GIL。GIL并不是Python的语 ⾔缺陷。是解释器层级的锁,跟Python语⾔特性⽆关
GIL存在的原因
- 早期计算机都是单核设计
- CPython在执⾏多线程的时候并不是线程安全的,所以为了程序的稳定性,加⼀把全局解释锁,能 够确保任何时候都只有⼀个Python线程执⾏
GIL的弊端
- GIL对计算密集型的程序会产⽣影响。因为计算密集型的程序,需要占⽤系统资源。
- GIL的存在,相当于始终在进⾏单线程运算,这样⾃然就慢了。
- IO密集型影响不⼤的原因在于,IO,input/output,这两个词就表明程序的瓶颈在于输⼊所耗费的 时间,线程⼤部分时间在等待,所以它们是多个⼀起等(多线程)还是单个等(单线程)⽆所谓 的
GIL锁什么时候释放?
- 在当前线程执⾏超时后会⾃动释放
- 在当前线程执⾏阻塞操作时会⾃动释放
- 当前执⾏完成时
GIL面试题
严重问题:
既然CPython解释存在GIL是否意味每个线程在全局变量就不⽤加Lock互斥锁了呢? 这是⼀个严重错误的想法,为什么⽤户操作全局数据还需要加Lock,因为GIL的释放时机我们⽆ 法控制-操作⾮常可能并没有完成,⽽不像Lock那样我们⽤完才释放(操作完整)。
htop:可以观察CPU占用情况
123456…是CPU核心编号
资源的消耗情况:
- 单进程 1核
- 多进程 多核
- 多线程 多核但是很低(GIL锁问题)
GIL锁解决方案 - 解决方案一(不推荐)
换一个解释器执行程序(jython:用JAVA写的python解释器) - 解决方案二(推荐,但是进程会占用更多的系统资源)
使用多进程替换多线程multiprocessing是一个多进程模块,开多个进程,每个进程都带有一个GIL,就相当于多线程来用了 - 解决方案三(推荐)
使用python语言的特性:胶水
(我们让子线程部分用c来写,实质上也相当于那部分代码绕过来cpython解释器)
首先编写一个test.c代码,如:
void Loop()
{
while(1)
{
;
}
}
然后将其编译成.so文件(share object),需要执行如下命令:
gcc test.c shared -o libtest.so
-shared:将其编译成so⽂件 -o:表示output,⽤来输出的⽂件名 库⽂件是以lib开头 编译过后,在当前的⽂件夹下就会⽣成⼀个.so⽂件
接着编写python文件,使用上面得到的.so文件:
import ctypes
import threading
# 加载动态库
my_lib = ctypes.cdll.LoadLibrary("./libtest.so")
#创建子线程
t = threading.Thread(target = my_lib.Loop)
t.start()
# 主线程
while True:
pass
注意:
CPU密集型不太适合多线程
I/O 密集型适合多线程(GIL锁会释放)
转载:https://blog.csdn.net/xwmwanjy666/article/details/107596478