Python中线程有两种方式:函数或者用类来包装线程对象。threading模块中包含了丰富的多线程支持功能:
通过Thread类来处理线程,类中提供的一些方法:
通过Thread直接构造线程,然后通过start方法启动线程:
threading.Thread(group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)
各参数说明:
def simpleRoutine(name, delay): print(f"routine {name} starting...") time.sleep(delay) print(f"routine {name} finished") if __name__ == '__main__': thrOne = threading.Thread(target=simpleRoutine, args=("First", 1)) thrTwo = threading.Thread(target=simpleRoutine, args=("Two", 2)) thrOne.start() thrTwo.start() thrOne.join() thrTwo.join()
直接继承Thread,创建一个新的子类(主要实现run方法):
class SimpleThread (threading.Thread): def __init__(self, name, delay): # threading.Thread.__init__(self) super().__init__() self.name = name self.delay = delay def run(self): print(f"thread {self.name} starting...") time.sleep(self.delay) print(f"thread {self.name} finished") if __name__ == '__main__': thrOne = SimpleThread("First", 2) thrTwo = SimpleThread("Second", 2) thrOne.start() thrTwo.start() thrOne.join() thrTwo.join()
当多个线程同时修改同一条数据时可能会出现脏数据;所以,就需要线程锁,即同一时刻只允许一个线程执行操作。
threading提供了Lock和RLock(可重入锁)两个类,它们都提供了如下两个方法来加锁和释放锁:
两种使用锁的方式:
gCount = 0 def PlusOne(locker): global gCount with locker: gCount += 1、 def MinusOne(locker): global gCount if locker.acquire(): gCount -= 1 locker.release()
Condition对象内部维护了一个锁(构造时可传递一个Lock/RLock对象,否则内部会自行创建一个RLock)和一个waiting池:
Condition对象:
__init__(self,lock=None)
:Condition类总是与一个锁相关联(若不指定lock参数,会自动创建一个与之绑定的RLock对象);
acquire(timeout)
:调用关联锁的acquire()方法;
release()
:调用关联锁的release()方法
wait(timeout)
:线程挂起,直到收到一个notify通知或超时才会被唤醒;必须在已获得锁的前提下调用;
notify(n=1)
:唤醒waiting池中的n个正在等待的线程并通知它:
notify_all()
:通知所有线程。
class Producer(threading.Thread): def __init__(self, cond, storage): threading.Thread.__init__(self) self.cond = cond self.storage = storage def run(self): label = 1 while True: with self.cond: if len(self.storage) < 10: self.storage.append(label) print(f"<- Produce {label} product") label += 1 self.cond.notify(2) else: print(f"<- storage full: Has Produced {label - 1} product") self.cond.notify_all() self.cond.wait() time.sleep(0.4) class Consumer(threading.Thread): def __init__(self, name, cond, storage): threading.Thread.__init__(self) self.name = name self.cond = cond self.storage = storage def run(self): while True: if self.cond.acquire(): if len(self.storage) > 1: pro = self.storage.pop(0) print(f"-> {self.name} consumed {pro}") self.cond.notify() else: print(f"-> {self.name} storage empty: no product to consume") self.cond.wait() self.cond.release() time.sleep(1)
信号量对象内部维护一个计数器:
acquire(blocking=True,timeout=None)
时减1,当计数为0就阻塞请求的线程;release()
时加1,当计数大于0恢复被阻塞的线程;threading中有Semaphore和BoundedSemaphore两个信号量;BoundedSemaphore限制了release的次数,任何时候计数器的值,都不不能大于初始值(release时会检测计数器的值,若大于等于初始值,则抛出ValueError异常)。
通过Semaphore维护生产(release一个)、消费(acquire一个)量:
# products = threading.Semaphore(0) def produceOne(label, sem: threading.Semaphore): sem.release() print(f"{label} produce one") def consumeOne(label, sem: threading.Semaphore): sem.acquire() print(f"{label} consume one")
通过BoundedSemaphore来控制并发数量(最多有Semaphore初始值数量的线程并发):
# runner = threading.BoundedSemaphore(3) def runBound(name, sem: threading.BoundedSemaphore): with sem: print(f"{name} is running") time.sleep(1) print(f"{name} finished")
事件对象内部有个标志字段,用于线程等待事件的发生:
多线程等待事件发生,然后开始执行:
def waiters(name, evt: threading.Event): evt.wait() print(f"{name} is running") time.sleep(1) print(f"{name} finished") def starting(evt: threading.Event): evt.set() print("event is set")
屏障用于设定等待线程数量,当数量达到指定值时,开始执行:
threading.Barrier(parties, action=None, timeout=None)
屏障属性与方法:
def waitBarrier(name, barr: threading.Barrier): print(f"{name} waiting for open") try: barr.wait() print(f"{name} running") time.sleep(5) except threading.BrokenBarrierError: print(f"{name} exception") print(f"{name} finished")
GIL(Global Interpreter Lock,全局解释器锁);cpython中,某个线程想要执行,必须先拿到GIL(可以把GIL看作是“通行证”)。每次释放GIL锁,线程都要进行锁竞争,切换线程,会消耗资源。
由于GIL锁的存在,python里一个进程永远只能同时执行一个线程(拿到GIL的线程),这就是为什么在多核CPU上,python的多线程效率并不高:
python在使用多线程的时候,调用的是c语言的原生线程: