使用 Condition 实现简单的队列

时间:2020-09-13 15:28:40   收藏:0   阅读:47

使用 Condition 实现简单的队列

队列特点

队列有以下特点:

如果要在多线程中使用,还要满足:

使用 Condition 实现简单的队列

from threading import Condition, Thread, Lock

class MyQueue:
    def __init__(self, cap):
        self.cap = cap
        lock = Lock()
        self.not_full = Condition(lock)
        self.not_empty = Condition(lock)
        self.container = []

    def put(self, item):
        with self.not_full:
            if len(self.container) >= self.cap:
                # 阻塞直到其它线程调用self.not_full.notify
                self.not_full.wait()
            print(‘put‘, item)
            self.container.append(item)
            self.not_empty.notify()

    def get(self):
        with self.not_empty:
            if len(self.container) <= 0:
                # 阻塞直到其它线程调用self.not_empty.notify
                self.not_empty.wait()
            item = self.container.pop(0)
            print(‘get‘, item)
            self.not_full.notify()
            return item

测试

import time

q = MyQueue(5)


def consumer():
    while True:
        time.sleep(1)
        q.get()


def producer(name):
    num = 0
    while True:
        num += 1
        item = f"{name}_{num}"
        q.put(item)
        time.sleep(1)


if __name__ == "__main__":
    c1 = Thread(target=consumer)
    p1 = Thread(target=producer, args=(‘p1‘,))
    p2 = Thread(target=producer, args=(‘p2‘,))
    for t in (c1, p1, p2):
        t.start()

Conditon 实现

Conditon 是用 Lock 实现的.

关键方法如下:

Demo 如下:

from threading import Lock
from concurrent.futures import ThreadPoolExecutor
import time


class Condition:
    def __init__(self):
        self.lock = None

    def wait(self):
        self.lock = Lock()
        self.lock.acquire()
        self.lock.acquire()

    def notify(self):
        self.lock.release()


cond = Condition()
q = []


def t1():
    while True:
        if len(q) == 0:
            cond.wait()
        item = q.pop(0)
        print(f‘get {item} from queue‘)


def t2():
    for i in range(5):
        print(f‘put {i} to queue‘)
        q.append(i)
        cond.notify()
        time.sleep(1)


if __name__ == ‘__main__‘:
    with ThreadPoolExecutor() as e:
        e.submit(t1)
        e.submit(t2)

结果:

put 0 to queue
get 0 from queue
put 1 to queue
get 1 from queue
put 2 to queue
get 2 from queue
put 3 to queue
get 3 from queue
put 4 to queue
get 4 from queue

Condition 的实际实现比 Demo 中要复杂的多,但基本原理确是相同的,其中一个关键点就是release 锁和 acquire 锁不一定是同一个线程,所以在下面的例子中是不会造成死锁的.

from threading import Lock
import time
from concurrent.futures import ThreadPoolExecutor

l = Lock()


def t1():
    l.acquire()
    l.acquire()
    print(‘t1‘)


def t2():
    time.sleep(1)
    l.release()


if __name__ == ‘__main__‘:
    with ThreadPoolExecutor() as e:
        e.submit(t1)
        e.submit(t2)

原文:https://www.cnblogs.com/aloe-n/p/13660953.html

评论(0
© 2014 bubuko.com 版权所有 - 联系我们:wmxa8@hotmail.com
打开技术之扣,分享程序人生!