classFetchUrls(threading.Thread):...def__init__(self,urls,output,lock):...self.lock=lock#传入的lock对象defrun(self):...whileself.urls:...self.lock.acquire()#获得lock对象,lock状态变为locked,并且阻塞其他线程获取lock对象(写文件的权利)print'lock acquired by %s'%self.nameself.output.write(d.read())print'write done by %s'%self.nameprint'lock released by %s'%self.nameself.lock.release()#释放lock对象,lock状态变为unlocked,其他的线程可以重新获取lock对象...defmain():...lock=threading.Lock()...t1=FetchUrls(urls1,f,lock)t2=FetchUrls(urls2,f,lock)...
以下是程序的输出:
1234567891011121314151617
$ python locks.py
lock acquired by Thread-2
write done by Thread-2
lock released by Thread-2
URL http://www.youtube.com fetched by Thread-2
lock acquired by Thread-1
write done by Thread-1
lock released by Thread-1
URL http://www.facebook.com fetched by Thread-1
lock acquired by Thread-2
write done by Thread-2
lock released by Thread-2
URL http://www.yahoo.com fetched by Thread-2
lock acquired by Thread-1
write done by Thread-1
lock released by Thread-1
URL http://www.google.com fetched by Thread-1
intPyThread_acquire_lock(PyThread_type_locklock,intwaitflag){...do{if(waitflag)status=fix_status(sem_wait(thelock));elsestatus=fix_status(sem_trywait(thelock));}while(status==EINTR);/* Retry if interrupted by a signal */....}
classFetchUrls(threading.Thread):...defrun(self):...whileself.urls:...withself.lock:#使用“with”语句管理锁的获取和释放print'lock acquired by %s'%self.nameself.output.write(d.read())print'write done by %s'%self.nameprint'lock released by %s'%self.name...
classProducer(threading.Thread):""" 向列表中生产随机整数 """def__init__(self,integers,condition):""" 构造器 @param integers 整数列表 @param condition 条件同步对象 """threading.Thread.__init__(self)self.integers=integersself.condition=conditiondefrun(self):""" 实现Thread的run方法。在随机时间向列表中添加一个随机整数 """whileTrue:integer=random.randint(0,256)self.condition.acquire()#获取条件锁print'condition acquired by %s'%self.nameself.integers.append(integer)print'%d appended to list by %s'%(integer,self.name)print'condition notified by %s'%self.nameself.condition.notify()#唤醒消费者线程print'condition released by %s'%self.nameself.condition.release()#释放条件锁time.sleep(1)#暂停1秒钟
classConsumer(threading.Thread):""" 从列表中消费整数 """def__init__(self,integers,condition):""" 构造器 @param integers 整数列表 @param condition 条件同步对象 """threading.Thread.__init__(self)self.integers=integersself.condition=conditiondefrun(self):""" 实现Thread的run()方法,从列表中消费整数 """whileTrue:self.condition.acquire()#获取条件锁print'condition acquired by %s'%self.namewhileTrue:ifself.integers:#判断是否有整数integer=self.integers.pop()print'%d popped from list by %s'%(integer,self.name)breakprint'condition wait by %s'%self.nameself.condition.wait()#等待商品,并且释放资源print'condition released by %s'%self.nameself.condition.release()#最后释放条件锁
$ python condition.py
condition acquired by Thread-1
159 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
condition acquired by Thread-2
159 popped from list by Thread-2
condition released by Thread-2
condition acquired by Thread-2
condition wait by Thread-2
condition acquired by Thread-1
116 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
116 popped from list by Thread-2
condition released by Thread-2
condition acquired by Thread-2
condition wait by Thread-2
classProducer(threading.Thread):...defrun(self):whileTrue:integer=random.randint(0,256)withself.condition:print'condition acquired by %s'%self.nameself.integers.append(integer)print'%d appended to list by %s'%(integer,self.name)print'condition notified by %s'%self.nameself.condition.notify()print'condition released by %s'%self.nametime.sleep(1)classConsumer(threading.Thread):...defrun(self):whileTrue:withself.condition:print'condition acquired by %s'%self.namewhileTrue:ifself.integers:integer=self.integers.pop()print'%d popped from list by %s'%(integer,self.name)breakprint'condition wait by %s'%self.nameself.condition.wait()print'condition released by %s'%self.name
class_BoundedSemaphore(_Semaphore):"""检查release()的调用次数是否小于等于acquire()次数"""def__init__(self,value=1,verbose=None):_Semaphore.__init__(self,value,verbose)self._initial_value=valuedefrelease(self):ifself._Semaphore__value>=self._initial_value:raiseValueError,"Semaphore released too many times"return_Semaphore.release(self)
classProducer(threading.Thread):""" 向列表中生产随机整数 """def__init__(self,integers,event):""" 构造器 @param integers 整数列表 @param event 事件同步对象 """threading.Thread.__init__(self)self.integers=integersself.event=eventdefrun(self):""" 实现Thread的run方法。在随机时间向列表中添加一个随机整数 """whileTrue:integer=random.randint(0,256)self.integers.append(integer)print'%d appended to list by %s'%(integer,self.name)print'event set by %s'%self.nameself.event.set()#设置事件 self.event.clear()#发送事件print'event cleared by %s'%self.nametime.sleep(1)
classConsumer(threading.Thread):""" 从列表中消费整数 """def__init__(self,integers,event):""" 构造器 @param integers 整数列表 @param event 事件同步对象 """threading.Thread.__init__(self)self.integers=integersself.event=eventdefrun(self):""" 实现Thread的run()方法,从列表中消费整数 """whileTrue:self.event.wait()#等待事件被触发try:integer=self.integers.pop()print'%d popped from list by %s'%(integer,self.name)exceptIndexError:# catch pop on empty listtime.sleep(1)
$ python event.py
124 appended to list by Thread-1
event set by Thread-1
event cleared by Thread-1
124 popped from list by Thread-2
223 appended to list by Thread-1
event set by Thread-1
event cleared by Thread-1
223 popped from list by Thread-2
deftask_done(self):self.all_tasks_done.acquire()try:unfinished=self.unfinished_tasks-1ifunfinished<=0:ifunfinished<0:raiseValueError('task_done() called too many times')self.all_tasks_done.notify_all()self.unfinished_tasks=unfinishedfinally:self.all_tasks_done.release()defjoin(self):self.all_tasks_done.acquire()try:whileself.unfinished_tasks:self.all_tasks_done.wait()finally:self.all_tasks_done.release()