多进程与多线程的对比 多任务的实现原理 一般地, 设计Master-Worker模式, Master负责分配任务, Worker负责执行任务, 所以在多任务环境下, 一般是由一个Master, 多个Worker.
多进程 主进程就是Master, 其他进程就是Worker
优点: 稳定性好: 一个子进程崩溃了, 不会影响主进程和其他子进程. 当然的, 主进程挂了后, 所有进程就全挂了, 但是Master进程只负责分配任务, 挂掉的概率很低.
缺点:
创建进程的代价大: 在Unix/Linux系统下, 用fork调用还行, 在windows下创建进程开销巨大.
操作系统能同时运行的进程数也是有限的: 在内存和CPU的限制下, 如果有几千个进程同时运行, 操作系统连调度都成问题.
多线程 主线程就是Master, 其他线程就是Worker
优点:
多线程模式模式通常比多进程快一点, 但是也快的不多
在windows下, 多线程的效率比多进程要高
缺点:
任何一个线程挂掉都可能直接造成整个进程崩溃(所有线程共享进程的内存. 在windows上, 如果一个线程执行的代码除出了问题, 可以进程看到这样的提示:”该程序执行了非法操作, 即将关闭”, 其实往往是某个线程出了问题, 但是操作系统会强制结束整个进程)
计算密集型 与 IO密集型
计算密集型: 适合多进程
要进行大量的计算, 消耗CPU资源, 比如计算圆周率/对视频进行高清解码等, 全靠CPU的运算能力. 这种计算密集型任务虽然也可以用于多任务完成, 但是任务越多, 花在任务切换的时间也越多, CPU执行任务的效率就会越低. 因此, 要最高效地利用CPU, 计算密集型任务同时进行的数量应该等于CPU的核心数
IO密集型: 适合多线程
设计到网络/磁盘IO的任务都是IO密集型任务, 这类任务特点是CPU消耗很少, 任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度). 对于IO密集型任务, 任务越多, CPU效率越高, 但也有一个限度. 常见的大部分任务都是IO密集型任务, 比如Web任务
进程 对于操作系统而言,一个任务就是一个进程。 进程是系统中程序执行和资源分配的基本单位。每个进程都有自己的数据段、代码段、和堆栈段。
一个进程的举例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 from time import sleepdef run (): while True : print ("this is a run function" ) sleep(1.2 ) if __name__ == "__main__" : while True : print ("this is mian function" ) sleep(1 ) run()
启动多个进程实现多任务 现代操作系统(Windows、Mac OS X、Linux、UNIX等)都支持“多任务”
什么叫多任务??? 操作系统同时可以运行多个任务
单核CPU实现多任务原理:操作系统轮流让各个任务交替执行,QQ执行2us,切换到微信,在执行2us,再切换到微博,执行2us……。表面是看,每个任务反复执行下去,但是CPU调度执行速度太快了,导致我们感觉就行所有任务都在同时执行一样
多核CPU实现多任务原理:真正的秉性执行多任务只能在多核CPU上实现,但是由于任务数量远远多于CPU的核心数量,所以,操作西永也会自动把很多任务轮流调度到每个核心上执行 并发:看上去一起执行,任务书多于CPU核心数 并行:真正一起执行,任务数小于等于CPU核心数
实现多任务的方式: 1、多进程模式 2、多线程模式 3、协程模式 4、多进程+多线程模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 ''' multiprocessing 库 跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象 ''' from multiprocessing import Processfrom time import sleepimport osdef run (str ): while True : print ("str %s, current process %s, parrent process %s" %(str , os.getpid(), os.getppid())) sleep(1 ) if __name__ == "__main__" : print ("主(父)进程启动-%s" %(os.getpid())) p = Process(target=run, args=("first" ,)) p.start() while True : print ("main process : while True" ) sleep(1 )
使用 .join() 等待子进程结束后再执行父进程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from multiprocessing import Processfrom time import sleepimport osdef run (string_ ): print ("子进程启动" ) sleep(string_) print ("子进程结束" ) if __name__ == "__main__" : print ("父进程启动" ) p = Process(target=run, args=("nice" ,)) p.start() p.join() print ("父进程结束" )
全局变量在多个进程中不能被共享 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from multiprocessing import Processnum = 100 def run (): print ("子进程开始" ) global num num += 1 print (num) print ("子进程结束,num={0}" .format (num)) if __name__ == "__main__" : print ("父进程开始" ) p = Process(target=run) p.start() p.join() print ("父进程结束,num=%d" %num)
使用进程池 启动多个子进程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from multiprocessing import Poolimport os, time, randomdef run (name ): print ("子进程%d启动--%s" % (name, os.getpid())) start = time.time() time.sleep(random.choice([1 ,2 ,3 ])) end = time.time() print ("子进程%d结束--%s--耗时%.2f" % (name, os.getpid(), end-start)) if __name__ == "__main__" : print ("父进程启动" ) pp = Pool(2 ) for i in range (6 ): pp.apply_async(run,args=(i,)) pp.close() pp.join() print ("父进程结束" )
举例: 多进程来拷贝文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import os, timefrom multiprocessing import Pooldef copyFile (rPath, wPath ): fr = open (rPath, "rb" ) fw = open (wPath, "wb" ) context = fr.read() fw.write(context) fr.close() fw.close() path = r"D:\file" toPath =r"D:\toFile" if __name__ == "__main__" : filesList = os.listdir(path) start = time.time() pp = Pool(4 ) for fileName in filesList: pp.apply_async(copyFile, args=(os.path.join(path,fileName), os.path.join(toPath,fileName))) pp.close() pp.join() end = time.time() print ("总耗时:%0.2f" % (end-start))
简单的进程间通信_Queue 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from multiprocessing import Process, Queueimport os, timedef write (q ): print ("启动写子进程%s" % (os.getpid())) for chr in ["A" , "B" , "C" , "D" ]: q.put(chr ) time.sleep(1 ) print ("结束写子进程%s" % (os.getpid())) def read (q ): print ("启动读子进程%s" % (os.getpid())) while True : value = q.get(True ) print ("value = " + value) print ("结束读子进程%s" % (os.getpid())) if __name__ == "__main__" : q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() pr.terminate() print ("父进程结束" )
封装进程对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from multiprocessing import Processimport os, timeclass MyProcess (Process ): def __init__ (self,name ): Process.__init__(self ) self .name = name def run (self ): print ("子进程(%s-%s)启动" % (self .name, os.getpid())) time.sleep(3 ) print ("子进程(%s-%s)结束" % (self .name, os.getpid())) myProcess = MyProcess("zyp" ) myProcess.run()
线程 线程 在一个进程的内部,要同时干多件事,就需要同时运行多个“子任务”, 我们把进程内的这些“子任务”叫做线程。
线程通常叫做轻型的进程。线程是共享内存空间的并发执行的多任务, 每一个线程都共享一个进程的资源,共享一个堆栈。
线程是最小的执行单元,而进程由至少一个线程组成。 如何调度进程和线程,完全由操作系统绝对,程序自己不能决定什么时候执行,执行多长时间。
模块 1、_thread模块 低级模块 2、threading模块 高级模块,对_thread进行了封装
启动一个线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import threading, timedef run (num ): print ("子线程(%s)开始" % (threading.current_thread().name)) time.sleep(2 ) print ("num:" , num) time.sleep(2 ) print ("子线程(%s)结束" % (threading.current_thread().name)) if __name__ == "__main__" : print ("主线程(%s)启动" % (threading.current_thread().name)) t = threading.Thread(target=run, name="runThread" , args=(1 ,)) t.start() t.join() print ("主线程(%s)结束" % (threading.current_thread().name))
线程间的共享数据 多线程和多进程最大的不同在于, 多进程中,同一个变量,各自有一份拷贝存在每个进程中,互不影响。 多线程中,所有变量都由所有线程共享。所以,任何一个变量都可以被任意一个线程修改, 因此,线程之间共享数据最大的危险在于多个线程同时修改一个变量,容易把内容改乱了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import threadingnum = 0 def run (n ): global num for i in range (1000000 ): num = num + n num = num - n if __name__ == "__main__" : iTimeStart = time.time() t1 = threading.Thread(target=run, args=(6 ,)) t2 = threading.Thread(target=run, args=(9 ,)) t1.start() t2.start() t1.join() t2.join() print ("num =" ,num) iTimeEnd = time.time() print (iTimeEnd - iTimeStart)
线程锁解决数据混乱 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import threadingimport timelock = threading.Lock() num = 0 def run (n ): global num for i in range (1000000 ): ''' lock.acquire() try: num = num + n num = num - n finally: #修改完一定要释放锁 lock.release() ''' with lock: num = num + n num = num - n if __name__ == "__main__" : iTimeStart = time.time() t1 = threading.Thread(target=run, args=(6 ,)) t2 = threading.Thread(target=run, args=(9 ,)) t1.start() t2.start() t1.join() t2.join() print ("num =" ,num) iTimeEnd = time.time() print (iTimeEnd - iTimeStart)
创建全局的ThreadLocal 对象 创建一个全局的ThreadLocal对象 每个线程有独立的存储空间 每个线程对ThreadLocal对象都可以读写,但是互不影响
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import threadinglocal = threading.local() def process_student (): print ('local.student: %s , current_thread : %s' % (local.student, threading.current_thread().name)) def process_thread (stu_name ): local.student = stu_name process_student() t1 = threading.Thread(target= process_thread, args=('Alice' ,), name='Thread-A' ) t2 = threading.Thread(target= process_thread, args=('Bob' ,), name='Thread-B' ) t1.start() t2.start() t1.join() t2.join() ''' 全局变量local就是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。 你可以把local看成全局变量,但每个属性如local.student都是线程的局部变量, 可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。 可以理解为全局变量local是一个dict,不但可以用local.student,还可以绑定其他变量, 如local.teacher等等。 应用:ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等, 这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。 '''
小结 一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。 ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import threading, timelocal = threading.local() num = 0 def run (x, n ): x = x + n x = x - n return x def func (n ): local.x = num for i in range (1000000 ): run(local.x, n) print ("%s- local.x =%d" %(threading.current_thread().name, local.x)) if __name__ == "__main__" : iTimeStart = time.time() t1 = threading.Thread(target=func, args=(6 ,)) t2 = threading.Thread(target=func, args=(9 ,)) t1.start() t2.start() t1.join() t2.join() print ("num =" ,num) iTimeEnd = time.time() print (iTimeEnd - iTimeStart)
定时线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 import threading, timedef run (): print ("sunck is a good man" ) t = threading.Timer(3 , run) t.start() t1 = time.time() t.join() t2 =time.time() print ("父线程结束" )print (t2 - t1)
线程通信 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import threading, timedef func (): event = threading.Event() def run (): for i in range (5 ): event.wait() print ("sunck is a good man!!%d" %i) t = threading.Thread(target=run).start() return event e = func() for i in range (5 ): time.sleep(2 ) e.set ()
生产者与消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import threading,queue,time,randomdef product (id , q ): while True : num = random.randint(0 , 10000 ) q.put(num) print ("生产者%d生产了%d数据放入了队列" % (id , num)) time.sleep(3 ) q.task_done() def customer (id , q ): while True : item = q.get() if item is None : break print ("消费者%d消费了%d数据" % (id , item)) time.sleep(2 ) q.task_done() if __name__ == "__main__" : q = queue.Queue() for i in range (4 ): threading.Thread(target=product, args=(i,q)).start() for i in range (3 ): threading.Thread(target=customer, args=(i,q)).start()
协程 子程序/函数:在所有语言中都是层级调用,比如A调用B,在B执行的过程中又可以调用C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。是通过栈实现的,一个线程就是执行一个子程序,子程序调用总是一个入口,一次返回,调用的顺序是明确的
概述:看上去也是子程序,但执行过程中,在子程序的内部可中断,然后转而执行别的子程序,不是函数调用。
协程举例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def run (): print ('start' ) print (1 ) yield 10 print (2 ) yield 20 print (3 ) yield 30 m = run() next (m) next (m) next (m)
数据传输 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def run (): data = "" print (0 , data) r = yield data print (1 , r, data) r = yield "aa" print (2 , r, data) r = yield "bb" print (3 , r, data) r = yield "cc" m = run() m.send(None ) print ("-----------" )print (1 ,m.send("a" ))print ("-----------" )print (2 ,m.send("b" ))print ("-----------" )print (3 ,m.send("c" ))
生产者与消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def product (c ): c.send(None ) for i in range (5 ): print ("生产者产生数据%d" %i) r = c.send(str (i)) print ("消费者消费了数据%s" %r) c.close() def customer (): data = "" while True : n = yield data if not n: return print ("消费者消费了%s" %n) data = "200" c = customer() product(c)