2008-01-04

生产者消费者模型中生产者的速度快于消费者时所产生的问题及其解决方法讨论

Views: 11572 | Add Comments

在生产者消费者模型中, 如果生产者生产产品(在下文中, 和读者写者模型中的信息是同一个概念)的速度 P 快于消费者消费产品的速度 C, 那么当时间趋于无限时, 待处理的产品的数量也将是无限多. 在带有缓冲区的生产者消费者模型中, 这意味着缓冲区将被挤爆, 导致系统崩溃.

下面两个条件可能导致系统的崩溃:

  1. 缓冲区是有限的.
  2. 缓冲区中的产品是有时效的.

这两个条件是任何系统固有的, 不能消除. 所以为了避免系统崩溃, 可有以下的处理方法:

  1. 在系统可接受的范围内丢弃多出的产品, 使生产者的速度宏观上和消费者的速度相等.
  2. 实现一种抑制生产者生产速度的机制, 使生产者的速度和消费者的速度相等.
  3. 增加消费者的数量, 使其为至少 P/C.

本文讨论的是以第 3 种为主要解决方法, 前两种为辅助解决方法时的情况. 消费者数量是有限制的, 因为消费者本身仍然需要消费其它资源.

消费者有两个队列: 等待队列(WaitQueue)和工作队列(WorkQueue). 等待队列中包含当前系统中正在等待产品的消费者. 工作队列中包含当前系统中正在处理产品的消费者.

用信号量来实现的伪代码:

// 运行中的 BeginWait 的实例上限.
int MAX_WAIT = 1;
Semaphore waitSemaphore = new Semaphore(MAX_WAIT, MAX_WAIT);
// 运行中的 DoWork 的实例上限.
int MAX_WORK = 3;
Semaphore workSemaphore = new Semaphore(MAX_WORK, MAX_WORK);

void BeginWait(callback){
    // 该方法将回调函数注册到某个系统中, 一旦等待的信号出现, 这个系统将调用回调函数.
    // 例如, 可以利用操作系统的信号机制.
    // 无论如何, 该方法的实现必须是几乎"立即"返回的.
}

// 本方法的实例上限 = min(MAX_WORK * 2 - 1, MAX_WORK + MAX_WAIT - 1)
void EndWait() {
    waitSemaphore.Release();    
    DoWork();
    if (waitSemaphore.WaitOne(0, false)) {
        BeginWait(this.EndWait);
    }
}

void DoWork(){
    workSemaphore.WaitOne();
    if (workSemaphore.WaitOne(0, false)) {
        if (waitSemaphore.WaitOne(0, false)) {
            BeginWait(this.EndWait);
        }
        workSemaphore.Release();
    }
    // 进行操作, 花费一些时间...
    workSemaphore.Release();
}

因为注册一个回调函数的花费可以忽略不计, 所以建议 MAX_WAIT 的值不要大于 1. 由于算法本身的实现, MAX_WAIT 大于 MAX_WORK 时和等于时的效果是相同的.

完整的可运行的 C# 源代码

下面的代码需要在 C# 编译器 8, 和 .Net 框架 2 下编译和运行. 运行它, 你可以观察到生产者和消费者的实例数目.

using System;
using System.Threading;

class Test
{
    public static void Main(){
        Class1 c = new  Class1();
        c.Run();
    }
}

class Class1
{
    private Random rand = new Random();
    private const int MAX_WAIT = 1;
    private Semaphore waitSemaphore = new Semaphore(MAX_WAIT, MAX_WAIT);
    private const int MAX_WORK = 6;
    private Semaphore workSemaphore = new Semaphore(MAX_WORK, MAX_WORK);
    // 将 Interlocked.Increment(ref num) 放到一段代码的开始处,
    // 同时将 Interlocked.Decrement(ref num); 放到这段代码的结束处,
    // 你将可以看到这段代码在操作系统中的运行实例数目.        
    private int num = 0;

    public void Run() {
        waitSemaphore.WaitOne();
        BeginWait(this.EndWait);
        Console.WriteLine("Started");
        while(true){
            Thread.Sleep(500);
            Console.WriteLine(
                "num={0,2}, working={1,3}. waiting={2,3}. maxWorkerId={3,3}, maxWaitId={4,3}",
                num, maxWorkerId - minWorkerId, maxWaitId - minWaitId, maxWorkerId, maxWaitId);
        }
    }

    private void EndWait(IAsyncResult ar) {
        Interlocked.Increment(ref num);
        waitSemaphore.Release();
        DoWork();
        if (waitSemaphore.WaitOne(0, false)) {
            BeginWait(this.EndWait);
        }
        Interlocked.Decrement(ref num);
    }

    // 工作中的消费者的标识.
    private int maxWorkerId = 0;
    private int minWorkerId = 0;

    private void DoWork() {
        Interlocked.Increment(ref maxWorkerId);        
        workSemaphore.WaitOne();
        // 应该实现一种计数锁. SemaphoreLock?
        if (workSemaphore.WaitOne(0, false)) {
            if(waitSemaphore.WaitOne(0, false)){
                BeginWait(this.EndWait);
            }
            workSemaphore.Release();
        }
        Thread.Sleep(rand.Next(2000) + 2000);
        workSemaphore.Release();
        Interlocked.Increment(ref minWorkerId);
    }

    delegate void WaitDelegate();

    private void BeginWait(AsyncCallback callback) {
        WaitDelegate wait = this.Wait;
        // 使用异步委托, 所以 BeginWait 应该会立即返回.
        wait.BeginInvoke(callback, null);
    }

    // 等待中的消费者标识.
    private int maxWaitId = 0;
    private int minWaitId = 0;

    private void Wait() {
        Interlocked.Increment(ref maxWaitId);
        Thread.Sleep(rand.Next(2000) + 200);
        Interlocked.Increment(ref minWaitId);
    }

}

2008-01-08 更新

Related posts:

  1. C#环形缓冲
  2. C# 中实现 FIFO 缓冲区–ArrayBuffer
  3. C#封装log4net
  4. 流式布局的原理和代码实现
  5. C# 版的 SimpleXML
Posted by ideawu at 2008-01-04 11:04:23

Leave a Comment