目录
- 并发编程,day1
- 操作系统发展史
- 多道技术
- 进程
- 程序运行的三种状态
- 同步与异步,阻塞非阻塞
- 开启进程的两种方式
- join方法
- 并发编程,day2
- 进程对象及其他方法
- 僵尸进程与孤儿进程(了解)
- 守护进程
- 互斥锁
- 队列介绍queue
- 进程间通信IPC机制
- 生产者消费者模型
- 线程相关知识点
- 并发编程day3
- 开启线程的两种方式
- TCP服务端实现并发的效果
- 线程对象的join方法
- 线程间数据共享
- 线程对象属性及其他方法
- 守护线程
- 线程互斥锁
- GIL全局解释器锁
- GIL与普通互斥锁的区别
- 同一个进程下的多线程无法利用多核又是,是不是就没有用了
- 并发编程day4
- 死锁与递归锁(了解)
- 信号量(了解)
- Event事件(了解)
- 各种队列(了解)
- 进程池与线程池(掌握)
- 协程基本原理(了解)
- genvet模块
- 协程实现TCP服务端的并发
- 并发编程day5
- IO模型简介
- 阻塞IO
- 非阻塞IO
- IO多路复用
- 异步IO模型,asyncio模块
- 网络并发知识点梳理
并发编程,day1
操作系统发展史
'''
主要是围绕着cpu的利用率问题
'''
多道技术
'''
单核实现并发的效果
并发:看起来像同时运行的就可以叫做并发
并行:真正意义上的同时运行
空间与时间上的复用
空间上:
多个程序共用一套计算机硬件
时间上:
切换+保存状态
'''
#切换分为两种
#1、当一个程序遇到io操作,操作系统会立刻剥夺该程序的cpu执行权限(提供了cpu利用率且不影响程序执行效率)
#2、当一个程序长时间占用cpu,操作系统也会立刻剥夺该程序cpu执行权限(降低了程序的运行效率,但是玩出了并发效果)
进程
'''
程序就是一堆死代码
进程则是正在执行的代码
'''
#进程调度算法
1、先来先服务调度算法
2、短作业优先调度算法
3、时间片轮转法,多级反馈队列
程序运行的三种状态
'''
就绪态:一切程序必须要先过就绪状态才能加入运行态
运行态:正在被cpu执行
阻塞态:程序遇到io操作了
理想:希望开发的程序一致处于就绪态与运行态
'''
同步与异步,阻塞非阻塞
同步与异步
'''任务提交方式'''
同步:任务提交之后原地的等待任务的返回结果,期间不做任何事情
异步:任务提交之后不原地等待任务返回结果,直接执行下一行代码
阻塞非阻塞
'''程序运行状态'''
阻塞:阻塞态
非阻塞:就绪态,运行态
开启进程的两种方式
'''
from mutiprocessing import Proces
1、实例化产生对象
2、类的继承,run方法
ps:
在windows里面开启进程代码一定要写在main代码块内
创建一个进程就是申请一个内存空间将代码丢进去
'''
第一种方式:
from multiprocessing import Process
import time
def task(name):
print('%s is running'%name)
time.sleep(3)
print('%s is over'%name)
if __name__ == '__main__':
#1、创建一个对象
p = Process(target=task,args=('jason',))
#容器类型哪怕里面只有一个元素,建议要用逗号隔开
#2、开启进程
p.start() #告诉操作系统帮你创建一个进程
print('主') #代表程序执行完毕
#第二种方式
from multiprocessing import Process
import time
class MyProcess(Process):
def run(self) -> None:
print('hello bf girl')
time.sleep(1)
print('get out!')
if __name__ == '__main__':
p = MyProcess()
p.start()
print('主')
join方法
'''
主进程等待子进程代码运行完毕后再往下执行代码
'''
并发编程,day2
进程对象及其他方法
'''
一台计算机上面运行很多进程,计算机会给每一个进行的进程分配一个PID号
如何查看:
windows电脑: tasklist |findstr 22
linux:ps -aux
'''
from multiprocessing import Process,current_process
current_process().pid #查看当前进程的进程号
os.getpid() #查看当前进程进程号
os.getppid() #查看当前进程的父进程号
p.terminate() #杀死当前进程
p.is_alive() #判断当前进程是否存活 True或者False
僵尸进程与孤儿进程(了解)
#僵尸进程
'''
死了但是没有死透
当你开设子进程后 该进程死后不会立刻释放占用的进程号
因为我要让父进程查看到他开设的子进程的一些基本信息 占用的pid号 运行时间等
所有的进程都会步入僵尸进程
父进程不死并且在无限制的创建子进程也不结束
回收子进程占用的pid号
父进程等待子进程运行结束
或者父进程调用join方法
'''
#孤儿进程
'''
子进程存活,父进程意外死亡
操作系统会开设一个儿童福利院专门管理孤儿进程,回收相关资源
'''
守护进程
#守护进程的主进程死了,守护进程则陪葬
from multiprocessing import Process
import time
def tesk(name):
print('%s总管正在活着'%name)
time.sleep(3)
print('%s总管正在死亡'%name)
if __name__ == '__main__':
p = process(target = task,arg=('egon',))
p.deamon = True
p.start()
print('皇帝jason寿终正寝')
互斥锁
多个进程操作同一份数据的时候,会出现数据 错乱问题
解决方法:加锁处理,将并发变成串行,牺牲效率,保证数据的安全
import random
from multiprocessing import Process,Lock
import time
import json
def search(i):
#文件操作读取票数
with open('data','r',encoding='utf-8') as f:
dic = json.load(f)
print('用户%s查询余额'%(i,dic.get('ticket_num')))
def buy(i):
#先查票
with open('data','r',encoding='utf-8') as f:
dic = json.load(f)
#模拟网络演示
time.sleep(random.randint(1,3))
#判断当前是否邮票
if dic.get('ticket_num') >0:
#修改数据库 买票
dic['ticket_num'] -= 1
#写入数据库
with open('data','w',encoding='utf-8') as f:
json.dump(dic,f)
print('用户%s买票成功'%i)
else:
print('用户%s买票失败'%i)
def run(i,mutex):
search(i)
mutex.acquire()
#给买票环节加锁
#枪锁
buy(i)
#释放锁
mutex.release()
if __name__ == '__main__':
#在主进程中生成一把锁,让所有子进程抢,谁先抢到谁先买票
mutex = Lock()
for i in range(1,11):
p = Process(target=run,args=(i,mutex))
p.start()
'''
扩展 行锁 表锁
注意:
1、锁不要轻易使用,容易造成死锁现象
2、锁旨在处理数据的部分加来保证数据安全
'''
GIL全局解释器锁
队列介绍queue
队列Queue模块
'''
管道:subprocess
stdin stdout stderr
队列:管道+锁
队列:先进先出
堆栈:先进后出
'''
import queue
#创建一个队列
q = queue.Queue(5) #括号内可以传数字,表示生成队列最大可以同时存放的数据量
#往队列中存数据
q.put(111)
q.put(111)
q.put(111)
print(q.full()) #判断当前队列是否满了
q.put(111)
q.put(111)
# q.put(111)
#当队列数据放满了之后,如果还有数据要放,程序会阻塞,直到有位置让出来
#去队列中取数据
v1 = q.get()
v2 = q.get()
v3 = q.get()
print(q.empty()) #判断当前队列是否为空
v4 = q.get()
v5 = q.get()
v6 = q.get_nowait() #没有数据直接报错
# v6 = q.get(timeout=3) #没有数据会原地等三秒,再报错
#队列中如果已经没有数据的话,get方法会原地阻塞
'''
q.full()
q.empty()
q.get_nowait()
再多进程的情况下是不精确
'''
进程间通信IPC机制
from multiprocessing import Queue,Process
'''
研究思路
1、主进程更子进程借助队列通信
2、子进程跟子进程借助队列通信
'''
def producet(q):
q.put('我是1号技师 很高兴为您服务')
def consumer(q):
print(q.get())
if __name__ == '__main__':
q = Queue()
p = Process(target=producet,args=(q,))
p1 = Process(target=consumer,args=(q,))
p.start()
p1.start()
生产者消费者模型
'''
生产者:生产/制造东西的
消费者:消费/处理东西的
该模型除了上述两个之外还需要一个媒介
生活中的例子,做包子的将包子做好后放在蒸笼(媒介)里面,,买包子的去蒸笼里面拿
生产者(做包子的)+消息队列(蒸笼)+消费者(吃包子的)
'''
from multiprocessing import Queue,Process,JoinableQueue
import random
import time
def producet(name,food,q):
for i in range(3):
data = '%s生产了%s%s'%(name,food,i)
#模拟延迟
time.sleep(random.randint(1,3))
print(data)
#将数据放入队列中
q.put(data)
def consumer(name,q):
# 消费者胃口很大 光盘行动
while True:
food = q.get() #没有数据就会卡住
#判断当前是否有结束的标识
# if food is None:break
time.sleep(random.randint(1,3))
print('%s吃了%s'%(name,food))
q.task_done() #告诉队列你已经从里面取出一个数据并处理完毕了
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producet,args=('大厨','包子',q))
p2 = Process(target=producet,args=('马叉虫tank','泔水',q))
c1 = Process(target=consumer,args=('春哥',q))
c2 = Process(target=consumer, args=('马哥', q))
p1.start()
p2.start()
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()
#等待生产者生产完毕之后,往队列中添加特定的结束符号
# q.put(None) #肯定再所有生产者生产数据的末尾
# q.put(None)
q.join() #等待列队中所有数据被取玩,再往下执行代码
'''
JoinableQueue 每当队列存入数据,内部会有一个计时器+1
每当你调用task_done的时候 计数器-1
q.join() 当计数器为0的时候 才往后运行
'''
#只要q.join执行完毕,说明消费之已经处理玩数据,消费者就没有存在的必要了
#可以通过守护进程来处理
线程相关知识点
1、什么是线程
'''
进程:资源单位
线程:执行单位
将操作系统比喻成一个大的工程
那么进程就相当于工厂里面的车间
而线程就是车间里面的流水线
每一个进程肯定自带一个线程
再次总结:
进程:资源单位,起一个进程仅仅只是在内存空间中开辟一块独立的空间
线程:执行单位,真正被cpu执行的起始是进程里面的线程,线程指的是代码的执行过程,代码中需要使用到的资源都找所再的进程索要
进程和线程都是虚拟单位,只是为了让我们方便的描述问题
'''
2、为何要有线程
'''
开设进程
1、申请内存空间 耗资源
2、'拷贝代码' 耗资源
开线程:
一个进程内可以开设多个线程,在用一个进程内开设多个线程时无须再次申请内存空间及拷贝代码的操作
总结:
开设线程的开销远远小于进程的开销
同一个进程下的多个线程数据是共享的
例子:
我们开发一款文本编辑器
获取用户输入功能
实时展示到屏幕功能
自动保存在硬盘的功能
正对以上三个功能,开设进程合适还是线程合适?
开三个线程处理上面的三个功能更加合理
'''
并发编程day3
开启线程的两种方式
开进程和开线程的步骤基本都是一样的,只是导入模块不一样
类的对象调用方法
类的继承重写run方法
第一种方法:
from multiprocessing import Process
from threading import Thread
import time
def task(name):
print('%s is running'%name)
time.sleep(1)
print('%s is over')
#开启线程不需要在main下执行代码,直接书写就可以
#但是我们还是习惯性的将启动命令卸载main下面
t = Thread(target=task(),args=('egon',))
t.start() #创建线程的开销非常小,创建的非常快
print('主')
第二种方法:
from multiprocessing import Process
from threading import Thread
import time
class Mythead(Thread):
def __init__(self,name):
'''针对给类里面的参数传参需定义一个init方法'''
#重写了别人的方法 又不知道别人的方法里有啥 就调用父类的方法
super().__init__()
self.name = name
def run(self) -> None:
print('%s is running'%self.name)
time.sleep(1)
print('egon')
#开启线程不需要在main下执行代码,直接书写就可以
#但是我们还是习惯性的将启动命令卸载main下面
if __name__ == '__main__':
t = Mythead
t.start()
print('主')
TCP服务端实现并发的效果
服务端:
import socket
from threading import Thread
from multiprocessing import Process
def talk(conn):
# 通信循环
while True:
try:
data = conn.recv(1024)
if len(data) == 0: break
print(data.decode('utf-8'))
conn.send(data.upper())
except ConnectionResetError as e:
print(e)
break
conn.close()
# 连接循环
def server(ip, port):
server = socket.socket() # 括号不加参数默认TCP
server.bind(('127.0.0.1', 8081))
server.listen(5)
while True:
conn, addr = server.accept()
t = Thread(target=talk, args=(conn,))
# t = Process(target=talk, args=(conn,)) #开启进程
t.start()
if __name__ == '__main__':
s = Thread(target=server, args=('127.0.0.1', 8080))
s.start()
客户端:
import socket
client = socket.socket()
client.connect(('127.0.0.1',8081))
while True:
client.send(b'hello world')
data = client.recv(1024)
print(data.decode('utf-8'))
线程对象的join方法
import ntpath
import stat
from threading import Thread
import time
def task(name):
print('')
if __name__ == '__main__':
p = Thread(target=task, args=('engon',))
p.start()
p.join() #主线程等待子进程结束后执行
print('主')
线程间数据共享
import ntpath
import stat
from threading import Thread
import time
money = 100
def task():
global money
money = 666
print(money)
if __name__ == '__main__':
t = Thread(target=task,)
t.start()
t.join()
print(money)
线程对象属性及其他方法
from threading import Thread,active_count,current_thread
import os,time
def task():
# print('hello',os.getpid())
print('hello',current_thread().name) #子线程的名字
time.sleep(1)
if __name__ == '__main__':
t = Thread(target=task)
t1 = Thread(target=task)
t.start()
t1.start()
# print('主',os.getpid())
# print('主', current_thread().name) #主线程的名字
print('主', active_count()) #统计当前正在活跃的线程数
守护线程
from threading import Thread
import time
def task(name):
print('%s is running'%name)
time.sleep(1)
print('%s is over'%name)
if __name__ == '__main__':
t = Thread(target=task,args=('hongwei',))
t.daemon = True
t.start()
print('主')
'''
主线程运行结束之后不会立刻结束,会等待其他非守护线程结束才结束
应为主线程的结束意味着所在的进程的结束
'''
例子(思考打印顺序):
from threading import Thread
import time
def foo():
print(123)
time.sleep(1)
print('end123')
def func():
print(456)
time.sleep(3)
print('end456')
if __name__ == '__main__':
t1 = Thread(target=foo)
t2 = Thread(target=func)
t1.daemon = True
t1.start()
t2.start()
print('主')
线程互斥锁
from threading import Thread
from multiprocessing import Lock
import time
mutex = Lock()
money = 100
def task():
global money
mutex.acquire() #加锁
tmp = money
time.sleep(0.1)
money = tmp - 1
mutex.release() #解锁
if __name__ == '__main__':
t_list = []
for i in range(20):
t = Thread(target=task)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(money)
GIL全局解释器锁
Ps:博客园密码:xiaoyuanqujing@666
'''
python解释器起始有多个版本
Cpython
Jpython
pypypython
但是普遍使用的都是Cpython解释器
在Cpython解释器中GIL是一把互斥锁,用来阻止同一个进程下的多线程的同时执行
同一个进程下的多个线程无法利用多核又是!!!
疑问:python的多线程是不是一点用都没有,无法利用多核优势
应为cpython中的内存管理不是线程安全的
内存管理(垃圾回收机制)
1、应用计数:当一个在内存中的值身上有应用的时候,应用计数加一,销毁的时候,应用技术减一
2、标记清楚:当程序在运行的时候发现内存占的空间大,会自动停止当前程序的运行,然后自动扫描当前程序所占用的所有变量名的应用计数,把应用技术为0的进行一个标记,然后一次性将其清楚释放内存
3、分代回收:为了减少垃圾回收消耗的资源
'''
'''
重点:
1、GIL不是python的特点而是Cpython解释器的特点
2、GIL是保证解释器级别的数据的安全
3、GIL会导致同一个进程下的多个线程的无法同时执行
4、针对不同的数据还是需要加不同的锁处理
5、解释型语言的通病:同一个进程多个线程无法利用多核优势
'''
GIL与普通互斥锁的区别
import time
from threading import Thread,Lock
mutex = Lock()
money = 100
def task():
global money
mutex.acquire() #上锁
tmp = money
time.sleep(0.1) #睡一下会释放GIL锁
money = tmp - 1
mutex.release()
if __name__ == '__main__':
t_list = []
for i in range(100):
t = Thread(target=task)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(money)
'''
100个线程骑起来之后 要先去抢GIL
我进入io GIL会自动释放,但我手上还有一个自己的互斥锁
其他线程虽然抢到了GIL但是抢不到互斥锁
最终GIL还是回到你的手上,你再去操作数据
'''
同一个进程下的多线程无法利用多核又是,是不是就没有用了
'''
多线程是否有用要看具体情况
单核:四个任务(IO密集型计算密集型)
多核:四个任务(IO密集型计算密集型)
'''
#计算密集型 每个任务都需要10s
单核(不用考虑,现在都是多核):
多进程:额外的消耗资源
多线程:减少开销
多核:
多进程:假设4个任务,四个cpu同时帮忙算,总耗时10s+
多线程:总耗时 40s+ (一个进程下多线程无法利用多核优势)
#IO密集型 每个任务都需要10s,cpu基本不工作了,只是来回切换,线程和进程都无法利用多核优势
多核:
多进程:相对浪费资源
多线程:更加节省资源
计算密集型举例:
#计算密集型
from multiprocessing import Process
from threading import Thread
import os,time
def work():
res = 1
for i in range(1,100000):
res *= i
if __name__ == '__main__':
l = []
print(os.cpu_count()) #获取当前计算机CPU个数
start_time = time.time()
for i in range(4):
p = Process(target=work) #5.4251203536987305
t = Thread(target=work) #10.328745126724243
# p.start()
# l.append(p)
t.start()
l.append(t)
for p in l:
p.join()
print(time.time()-start_time)
#多进程胜出
IO密集型距离:
#IO密集型
from multiprocessing import Process
from threading import Thread
import os,time
def work():
time.sleep(2)
if __name__ == '__main__':
l = []
print(os.cpu_count()) #获取当前计算机CPU个数
start_time = time.time()
for i in range(40):
p = Process(target=work) #2.947589159011841
t = Thread(target=work) #2.0187268257141113
# p.start()
# l.append(p)
t.start()
l.append(t)
for p in l:
p.join()
print(time.time()-start_time)
#多线程胜出
总结:
'''
多进程和多线程都有各自的优势,我们通常可以再多进程下面开设多线程,这样既可以利用多核,也可以减少资源消耗
'''
并发编程day4
死锁与递归锁(了解)
锁的使用 枪锁必须要释放锁,其实再操作锁的时候也极容易产生死锁现象(整个程序卡死 阻塞)
死锁:
线程1抢到func2的B锁,睡两秒,再回来抢A锁,发现A锁被线程2抢走,还未释放,线程2抢完A锁后接着抢B锁,发现B锁被线程1抢走,这样就造成死锁现象
from threading import Thread, Lock
import time
mutexA = Lock()
mutexB = Lock()
#类只要加括号多次,产生的肯定是不同的对象
#如果想实现多次加括号得到的是相同的对象,单例模式
class MyThead(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print('%s 抢到A锁'%self.name) #获取当前线程名
mutexB.acquire()
print('%s 抢到b锁'%self.name)
mutexB.release()
mutexA.release()
def func2(self):
mutexB.acquire()
print('%s 抢到b锁'%self.name)
time.sleep(2)
mutexA.acquire()
print('%s 抢到A锁'%self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t = MyThead()
t.start()
递归锁:
可以被连续的acquire和release
但是只能被第一个抢这把锁执行上述操作
它的内部有一个计数器 每acquire一次 计数器加一 每一次realse一次 计数减一,只要计数不为0 那么其他人都无法抢到该锁
示例:
#将上述的
mutexA = Lock()
mutexB = Lock()
#换成
mutexA = mutexB = RLock()
信号量(了解)
信号量在不同阶段可能对应不同技术点
在并发编程中,信号量指的是锁
'''
如果我们将互斥锁比喻成一个厕所的话
那么信号量就相当于多个厕所
'''
from threading import Thread, Semaphore
import time
import random
sm = Semaphore(5) # 括号内写数字,写几就表示开设几个坑位
def task(name):
sm.acquire()
print('%s 正在蹲坑' % name)
# time.sleep(3)
time.sleep(random.randint(1, 3))
sm.release()
if __name__ == '__main__':
for i in range(20):
t = Thread(target=task, args=('伞兵%s号' %i, ))
t.start()
Event事件(了解)
一些进程/线程 需要等待另一些进程/线程 运行完毕之后才能运行,类似于发射信号一样
from threading import Thread, Event
import time
event = Event() # 造了一个红绿灯
def light():
print('红灯亮着的')
time.sleep(3)
print('绿灯亮了')
# 告诉等待红灯的人可以走了
event.set()
def car(name):
print('%s 车正在等红灯' % name)
event.wait() # 等待别人给你发消息
print('%s 车加油门飙车走了' % name)
if __name__ == '__main__':
t = Thread(target=light)
t.start()
for i in range(20):
t = Thread(target=car, args=('%s' % i, ))
t.start()
各种队列(了解)
"""
同一个进程下多个线程数据是共享的
为什么在同一个进程下还会使用队列呢?
应为队列是:(多个线程在操作同一份数据的时候会导致数据不安全)
管道 + 锁
所以用队列还是为了保证数据的安全
"""
# 我们现在使用的队列都是只能在本地测试使用
# 1、先进先出queue
# import queue
# q = queue.Queue(3)
# q.put(1)
# q.get()
# q.get_nowait()
# q.get(timeout=3)
# q.full()
# q.empty()
# 2、后进先出LifoQueue
# import queue
#
# q = queue.LifoQueue()
# q.put(1)
# q.put(2)
# q.put(3)
# print(q.get()) #3
# 3、优先级PriorityQueue 可以给放入队列中的数据设置进出优先级
import queue
q = queue.PriorityQueue(3)
q.put((10, '111'))
q.put((11, '222'))
q.put((-1, '333'))
print(q.get())
# put括号内放一个元组 第一个放的数字表示优先级
# 数字越小优先级越高
进程池与线程池(掌握)
线程池:
"""
回顾之前TCP并发是通过每来一个人就开设一个进程或者线程去处理
"""
"""
无论开设进程也好,还是开设线程也好 都是需要消耗资源
只不过开设线程比开设进程的稍微小一点而已
我们是不可能做到无限制的开设进程和线程的 因为计算机硬件的资源更不上!!
硬件开发的速度远远赶不上软件
我们的宗旨是在保证计算机硬件能正常工作的情况下最大限度利用它
"""
# 池的概念
"""
什么是池?
池是用来保证计算机硬件安全的情况下最大限度的利用计算机
他降低了程序的运行效率但保证了计算机硬件的安全,从而让写的程序正常运行
"""
# 基本使用:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
pool = ThreadPoolExecutor()
# 括号内可以传数字 不传的花默认会开设当前计算机cpu个数五倍的线程
"""
池子造出来之后 里面会固定存五个线程
这五个线程不会出现重复创建和销毁
池子的使用:
只需要将做的任务往池子中提交就好,自动会有人来服务
"""
def task(n):
print(n)
time.sleep(0.5)
return 'asd'
"""
任务的提交方式
同步:提交之后等待返回结果在往下执行
异步:提交之后不等待返回结果,执行完直接往下执行
如何获取返回结果?
"""
# pool.submit(task, 1) # 朝池子中提交任务 异步提交
# print('主')
t_list = []
for i in range(20): # 朝池子中提交20个任务
res = pool.submit(task, i) #
# print(res)
# print(res.result()) # result方法:1、同步提交类似join 2、打印返回task的结果
t_list.append(res)
# 等待线程池中所有任务执行完毕之后再继续往下执行
pool.shutdown() # 关闭线程池 等待线程池中所有的任务运行完毕
for t in t_list:
print('>>>:', t.result()) # 结果是有序的
进程池:
# 基本使用:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os
pool = ProcessPoolExecutor()
# 括号内可以传数字 不传的花默认会开设当前计算机cpu个数进程
"""
池子造出来之后 里面会固定存五个进程
这五个进程不会出现重复创建和销毁
池子的使用:
只需要将做的任务往池子中提交就好,自动会有人来服务
"""
def task(n):
print(n, os.getpid())
time.sleep(0.5)
return 'asd'
def call_back(n):
print('call_back', n.result())
"""
任务的提交方式
同步:提交之后等待返回结果在往下执行
异步:提交之后不等待返回结果,执行完直接往下执行
如何获取返回结果?
异步提交任务返回结果 应该通过回调机制来获取
回调机制
就相当于给每个异步任务绑定了一个定时炸弹
一旦该任务有结果立刻触发爆炸
"""
if __name__ == '__main__':
# pool.submit(task, 1) # 朝池子中提交任务 异步提交
# print('主')
t_list = []
for i in range(20): # 朝池子中提交20个任务
res = pool.submit(task, i).add_done_callback(call_back)
t_list.append(res)
总结:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
pool = ProcessPoolExecutor() #创建进程池
pool = ThreadPoolExecutor() #线程创建池
res =pool.submit(task, i) #submit会返回一个future类的对象,该对象调用result就能获取到任务的结果
异步回调机制:
给每一个异步提交的任务绑定一个方法,一旦任务有结果了会立刻自动触发该方法
def call_back(n):
print('call_back', n.result())
res =pool.submit(task, i).add_done_callback(call_back)
#注意异步回调函数拿到的是一个对象,也需要result才能拿到结果
协程基本原理(了解)
进程:资源单位
线程:执行单位
协程:单进程下实现并发,一个并不是实际存在的东西
我们程序员自己在代码层面上检测我们所有io操作
一旦遇到io了,我们在代码级别完成切换
这样给cpu的感觉是你这个程序一致运行,没有IO
从而提升程序的运行效率
多道技术
切换+保存状态
cpu两种切换
1、程序遇到io
2、程序长时间占用
代码如何做到
切换+保存状态 (yield)
#保存上次我执行的状态 下一次接着上一次的操作继续往后执行
#切换不一定是提升效率,也有可能是降低效率,IO切提升效率,没有IO切降低效率
验证切换是否一定提升效率:
import time
# def func1():
# for i in range(100000000):
# i+1
#
#
# def func2():
# for i in range(100000000):
# i+1
def func1():
while True:
100000000 + 1
yield
def func2():
g = func1() # 初始化生成器
for i in range(100000000):
i + 1
next(g)
start_time = time.time()
func1()
func2()
print(time.time() - start_time)
genvet模块
from gevent import monkey;monkey.patch_all()
import time
from gevent import spawn #检测机制
"""
genvet监测io
gevent模块本身无法检测常见的一些io操作
apawn在检测的时候是异步的
需要再使用的时候额外的导入一句话
from gevent import monkey
monkey.patch_all()
又由于上面两句话在使用gevent模块的时候肯定要导入,所以还支持简写
from gevent import monkey;monkey.patch_all()
"""
def heng():
print('哼')
time.sleep(2)
print('哼')
def ha():
print('哈')
time.sleep(3)
print('哈')
start_time = time.time()
g1 = spawn(heng)
g2 = spawn(ha)
g1.join()
g2.join() #等待被检测的任务执行完毕,再往后执行
print(start_time - time.time())
协程实现TCP服务端的并发
服务端:
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn
"""
不开启进程和线程实现 TCP服务端并发
"""
import socket
def communication(conn):
# 通信循环
while True:
try:
data = conn.recv(1024)
if len(data) == 0: break
print(data.decode('utf-8'))
conn.send(data.upper())
except ConnectionResetError as e:
print(e)
break
conn.close()
# 连接循环
def server(ip, port):
server = socket.socket() # 括号不加参数默认TCP
server.bind(('127.0.0.1', 8080))
server.listen(5)
while True:
conn, addr = server.accept()
spawn(communication, conn)
if __name__ == '__main__':
g1 = spawn(server, '127.0.0.1', 8080)
g1.join()
客户端:
from threading import Thread, current_thread
import socket
def xclient():
client = socket.socket()
client.connect(('127.0.0.1', 8080))
n = 0
while True:
msg = '%s say hello %s'%(current_thread().name, n)
n += 1
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
if __name__ == '__main__':
for i in range(50):
t = Thread(target=xclient)
t.start()
总结:
多进程下面开设多线程
多线程下面再利用协程
最大程度提升软件运行效率
并发编程day5
IO模型简介
'''
正对网络io的模型:
1、blocking IO 阻塞IO
2、nonblocking IO 非阻塞IO
3、IO multiplexing IO多路复用
4、asynchronous IO 异步IO
5、signal driven IO 信号驱动IO(并不常用,不做了解)
'''
同步异步:任务的提交方式,结果可以通过回调机制来接收
阻塞非阻塞:
常见的网络阻塞状态:
1、accept
2、recv
3、recvfrom
send虽然也有io行为,但是不在我们考虑范围
阻塞IO
'''
之前写的都是阻塞IO,协程除外
'''
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn
"""
不开启进程和线程实现 TCP服务端并发
"""
server = socket.socket() # 括号不加参数默认TCP
server.bind(('127.0.0.1', 8080))
server.listen(5)
while True:
conn, addr = server.accept()
while True:
try:
data = conn.recv(1024)
if len(data) == 0: break
print(data.decode('utf-8'))
conn.send(data.upper())
except ConnectionResetError as e:
print(e)
break
conn.close()
# 再服务端开设多进程或者多线程 进程池线程池 其实还是没有解决IO问题
该等的地方还是得等,没有规避
只不过多个人等待,彼此互不干扰

非阻塞IO
服务端:
import time
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn
"""
不开启进程和线程实现 TCP服务端并发
"""
server = socket.socket() # 括号不加参数默认TCP
server.bind(('127.0.0.1', 8080))
server.listen(5)
server.setblocking(False)
# 将所有网络阻塞变为非阻塞
r_list = []
del_list = []
while True:
try:
conn, addr = server.accept()
r_list.append(conn)
except BlockingIOError as e:
# print('做其他事')
# time.sleep(0.1)
# print('列表长度:',len(r_list))
for conn in r_list:
try:
data = conn.recv(1024) # 没有消息 报错
if len(data) == 0: # 客户端断开连接
conn.close() # 关闭conn
# 将无用的conn从r_list删除
del_list.append(conn)
continue
conn.send(data.upper())
except BlockingIOError:
continue
except ConnectionResetError:
conn.close()
# 删除列表中的通信对象
del_list.append(conn)
# 回收无用连接
for conn in del_list:
r_list.remove(conn)
del_list.clear()
客户端:
import socket
client = socket.socket()
client.connect(('127.0.0.1', 8080))
while True:
client.send(b'hello')
data = client.recv(1024)
print(data)
总结:
"""
虽然非阻塞IO感觉很强
但是该模型会 长时间占用着CPU不干活,让cpu不停空转
我们实际应用中也不会考虑使用非阻塞IO模型
"""

IO多路复用
"""
当监管对象只有一个的时候 其实IO多路复用连IO阻塞都比不上!
但是IO多路复用可以一次性监管很多个对象
server = socket.socket()
conn, addr = server.accept()
监管机制是操作系统本身就有的 如果你想要用该监管机制(select)
需要你导入对应的select模块
"""
客户端:
import select
import socket
server = socket.socket() # 括号不加参数默认TCP
server.bind(('127.0.0.1', 8080))
server.listen(5)
server.setblocking(False)
# 将所有网络阻塞变为非阻塞
read_list = [server]
while True:
r_list, w_list, x_list = select.select(read_list, [], [])
"""
帮忙监管,一旦有人来了,立刻返回对应的监管对象
"""
# print(r_list)
for i in r_list:
"""正对不同对象做不同的处理"""
if i is server:
conn, addr = i.accept()
# 也应该添加到监管的队列中
read_list.append(conn)
else:
res = i.recv(1024)
if len(res) == 0:
i.close()
read_list.remove(i)
continue
print(res)
i.send(b'heiheiheiheihei')
服务端:
import socket
client = socket.socket()
client.connect(('127.0.0.1', 8080))
while True:
client.send(b'hello')
data = client.recv(1024)
print(data)
总结:
"""
监管机制其实有很多
select机制:
windows linux都有
poll机制 :
只有linux有 poll和select都可以监管多个对象,但poll监管的数量更多
ps:上述select和poll机制其实都不是很完美,当监管对象特别多的时候可能会出现及其大的延迟响应
epoll机制:
只有linux有
它给每一个监管对象都绑定一个回调机制
一旦有响应,回调机制立刻发起提现
正对怎么选这些机制可以通过selectors模块来自动选择
"""

异步IO模型,asyncio模块
"""
异步IO模型是所有模型中效率最高的 也是使用最广泛的
相关的模块和框架
模块:asyncio模块
异步框架:sanic tronado twisted
速度快!!!
"""
import threading
import asyncio
"""
单线程模拟高并发
"""
@asyncio.coroutine
def hello():
print('hello world %s' % threading.current_thread())
yield from asyncio.sleep(1) # 换成真正的IO操作
print('hello world %s' % threading.current_thread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
网络并发知识点梳理
网络编程:
1、软件开发架构
2、互联网协议
-osi七层模型
-TCP,UDP
3、三次握手四次挥手(重点)
4、socket简介
5、TCP粘包问题, 定制固定长度的报头
6、UDP协议
7、socketserver模块 实现服务端并发
并发编程:
1、操作系统发展史
2、多道计数
3、进程理论
4、开启进程的两种方式
5、互斥锁
6、生产者消费者模型
7、线程理论
8、开启线程的两种方式
9、GIL全局解释器锁
10、进程池 线程池
11、协程的概念
12、IO模型的了解