在生产者消费者模型中, 如果生产者生产产品(在下文中, 和读者写者模型中的信息是同一个概念)的速度 P 快于消费者消费产品的速度 C, 那么当时间趋于无限时, 待处理的产品的数量也将是无限多. 在带有缓冲区的生产者消费者模型中, 这意味着缓冲区将被挤爆, 导致系统崩溃.
下面两个条件可能导致系统的崩溃:
- 缓冲区是有限的.
- 缓冲区中的产品是有时效的.
这两个条件是任何系统固有的, 不能消除. 所以为了避免系统崩溃, 可有以下的处理方法:
- 在系统可接受的范围内丢弃多出的产品, 使生产者的速度宏观上和消费者的速度相等.
- 实现一种抑制生产者生产速度的机制, 使生产者的速度和消费者的速度相等.
- 增加消费者的数量, 使其为至少 P/C.
本文讨论的是以第 3 种为主要解决方法, 前两种为辅助解决方法时的情况. 消费者数量是有限制的, 因为消费者本身仍然需要消费其它资源.
消费者有两个队列: 等待队列(WaitQueue)和工作队列(WorkQueue). 等待队列中包含当前系统中正在等待产品的消费者. 工作队列中包含当前系统中正在处理产品的消费者.
用信号量来实现的伪代码:
// 运行中的 BeginWait 的实例上限. int MAX_WAIT = 1; Semaphore waitSemaphore = new Semaphore(MAX_WAIT, MAX_WAIT); // 运行中的 DoWork 的实例上限. int MAX_WORK = 3; Semaphore workSemaphore = new Semaphore(MAX_WORK, MAX_WORK); void BeginWait(callback){ // 该方法将回调函数注册到某个系统中, 一旦等待的信号出现, 这个系统将调用回调函数. // 例如, 可以利用操作系统的信号机制. // 无论如何, 该方法的实现必须是几乎"立即"返回的. } // 本方法的实例上限 = min(MAX_WORK * 2 - 1, MAX_WORK + MAX_WAIT - 1) void EndWait() { waitSemaphore.Release(); DoWork(); if (waitSemaphore.WaitOne(0, false)) { BeginWait(this.EndWait); } } void DoWork(){ workSemaphore.WaitOne(); if (workSemaphore.WaitOne(0, false)) { if (waitSemaphore.WaitOne(0, false)) { BeginWait(this.EndWait); } workSemaphore.Release(); } // 进行操作, 花费一些时间... workSemaphore.Release(); }
因为注册一个回调函数的花费可以忽略不计, 所以建议 MAX_WAIT 的值不要大于 1. 由于算法本身的实现, MAX_WAIT 大于 MAX_WORK 时和等于时的效果是相同的.
完整的可运行的 C# 源代码
下面的代码需要在 C# 编译器 8, 和 .Net 框架 2 下编译和运行. 运行它, 你可以观察到生产者和消费者的实例数目.
using System; using System.Threading; class Test { public static void Main(){ Class1 c = new Class1(); c.Run(); } } class Class1 { private Random rand = new Random(); private const int MAX_WAIT = 1; private Semaphore waitSemaphore = new Semaphore(MAX_WAIT, MAX_WAIT); private const int MAX_WORK = 6; private Semaphore workSemaphore = new Semaphore(MAX_WORK, MAX_WORK); // 将 Interlocked.Increment(ref num) 放到一段代码的开始处, // 同时将 Interlocked.Decrement(ref num); 放到这段代码的结束处, // 你将可以看到这段代码在操作系统中的运行实例数目. private int num = 0; public void Run() { waitSemaphore.WaitOne(); BeginWait(this.EndWait); Console.WriteLine("Started"); while(true){ Thread.Sleep(500); Console.WriteLine( "num={0,2}, working={1,3}. waiting={2,3}. maxWorkerId={3,3}, maxWaitId={4,3}", num, maxWorkerId - minWorkerId, maxWaitId - minWaitId, maxWorkerId, maxWaitId); } } private void EndWait(IAsyncResult ar) { Interlocked.Increment(ref num); waitSemaphore.Release(); DoWork(); if (waitSemaphore.WaitOne(0, false)) { BeginWait(this.EndWait); } Interlocked.Decrement(ref num); } // 工作中的消费者的标识. private int maxWorkerId = 0; private int minWorkerId = 0; private void DoWork() { Interlocked.Increment(ref maxWorkerId); workSemaphore.WaitOne(); // 应该实现一种计数锁. SemaphoreLock? if (workSemaphore.WaitOne(0, false)) { if(waitSemaphore.WaitOne(0, false)){ BeginWait(this.EndWait); } workSemaphore.Release(); } Thread.Sleep(rand.Next(2000) + 2000); workSemaphore.Release(); Interlocked.Increment(ref minWorkerId); } delegate void WaitDelegate(); private void BeginWait(AsyncCallback callback) { WaitDelegate wait = this.Wait; // 使用异步委托, 所以 BeginWait 应该会立即返回. wait.BeginInvoke(callback, null); } // 等待中的消费者标识. private int maxWaitId = 0; private int minWaitId = 0; private void Wait() { Interlocked.Increment(ref maxWaitId); Thread.Sleep(rand.Next(2000) + 200); Interlocked.Increment(ref minWaitId); } }
2008-01-08 更新