• 2009-06-12

    一对多关系的不稳定性

    Views: 26091 | No Comments

    经典的对象关系模型有三个: 一对一(one-to-one), 一对多(one-to-many), 多对多(many-to-many). 事实上, 一对多只是一种游离的中间状态, 任何设计中一旦使用了一对多关系, 迟早会发现只有多对多才能满足业务的变更. 对象关系要么是一对一, 要么是多对多, 没有一对多.

    以博客程序作为例子, 想当然地, 认为博客作者只能是一个人, 所以用户和博客文章的关系是一对多的. 但是, 如果不把博客当为个人的日记本来使用, 把博客当做文章发布平台, 这时, 这个模型就不能使用了, 因为文章的作者是可以有多人的.

    这只是其中一个例子, 你可能认为有真正的文章发布平台, 不应该把博客程序当做文章发布系统. 想想, 人们会本能地认为一个部门只能属于一个公司, 一个小组只能属于一个部门, 但是, 多个公司合办的交流部门呢? 两个部门合作的小组呢? 这种例子数不胜数.

    根据我的经验, 一旦有人设计对象关系模型(如数据库设计)时使用出了一对多关系模型, 我们应该像看到了想抢我们饭碗的人一样警惕起来: 真的是一对多关系吗? 就跟我们问一个从天而降的不明类人生物: 你是地球人吗?

    Posted by ideawu at 2009-06-12 18:24:04 Tags: ,
  • 2009-05-28

    基于列的数据库

    Views: 13873 | No Comments

    模糊想法, 请忽略.

    基于行的数据库可以通过一对多关系的表很容易地表示树形结构的对象. 但是, 基于列的数据库, 如何表示呢?

    假设有一个对象实例, 它的某个字段是其它类型对象的列表. 这时, 如何用列的实例表示这个对象?

    首先, 基于列的数据库, 任何对象的实例都可以用一个id列表, 该id一般设计为整数.

    列实例(称为字段)不直接声明自己属于哪一个对象, 通过单独的存储对象和列实例的关系来表示原始对象(raw), 这种对象的每一个字段都是原始数据类型.

    如果对象的某个字段是其它对象的列表, 如何表示?

    这里使用对象与对象的关系表来存储.

    Continue reading »

    Posted by ideawu at 2009-05-28 12:44:57
  • 2009-03-11

    关于每一个数据库表都应该有一个单一的字段作为主键的讨论

    Views: 18882 | 3 Comments

    2010年5月6日更新: 只有真正懂得了这个道理的人, 才算真正理解了关系数据库. 如何才算懂得了这个道理? - 即使你有一百个理由要用关联主键, 你也能找到这唯一的一个理由放弃, 改而使用单一字段做主键.

    ------

    在数据库设计中, 每一个表都应该有一个字段作为主键. 这个字段一般是自增整数字段, 或者某些数据库支持的自动产生不重复字符串的字段, 也可以是程序自己产生的唯一标识. 总之, 每一个数据库表都应该有一个单一的字段作为主键.

    使用单一的字段作为表的主键, 有许多优点. 最重要的一条是可以通过一个原子属性(如整数, 字符串)来标识一行记录. 一旦可以方便地标识一行记录, 那么数据的查询, 更新, 删除也都非常简单.

    如果一个表没有主键, 那么必须通过一行记录本身才能标识一行记录. 也就是, 你必须知道一行记录的每一个字段的值, 才能在 SQL 语句中操作这条记录.
    Continue reading »

    Posted by ideawu at 2009-03-11 20:03:07
  • 2009-02-20

    C#环形缓冲

    Views: 13119 | No Comments
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;
    
    /**
     * 缓冲区的可能状态:
     * <code>
     * (1)
     * ----====== data ======-----rspace----
     *     |                 |             |
     *     rd_nxt            wr_nxt        capacity-1
     * (2)
     * ==ldata==-------------==== rdata ====
     *          |            |             |
     *          wr_nxt       rd_nxt        capacity-1
     * (3)
     * ===ldata=============rdata===========(full of data)
     *             |
     *             wr_nxt(rd_nxt)
     * (4)
     * -------------------------------------(empty)
     *           |
     *           wr_nxt(rd_nxt)
     * </code>
     */
    
    /// <summary>
    /// 使用字节数组来实现的缓冲区. 该缓冲区把该数组看作是一个环,
    /// 支持在一块固定的数组上的无限次读和写, 数组的大小不会自动变化.
    /// ideawu
    /// </summary>
    /// <typeparam name="T">所缓冲的数据类型.</typeparam>
    public class ArrayBuffer<T>
    {
    	/// <summary>
    	/// 默认大小.
    	/// </summary>
    	private const int DFLT_SIZE = 512 * 1024;
    
    	/// <summary>
    	/// 缓冲区还能容纳的元素数目.
    	/// </summary>
    	private int space = 0;
    
    	/// <summary>
    	/// 缓冲区中的数据元素数目.
    	/// </summary>
    	private int available = 0;
    
    	/// <summary>
    	/// 缓冲区的容量.
    	/// </summary>
    	private int capacity = DFLT_SIZE;
    	// 注意 capacity 和 buf.Length 可以不相同, 前者小于或者等于后者.
    
    	/// <summary>
    	/// 下一次要将数据写入缓冲区的开始下标.
    	/// </summary>
    	private int wr_nxt = 0;
    
    	/// <summary>
    	/// 下一次读取接收缓冲区的开始下标.
    	/// </summary>
    	private int rd_nxt = 0;
    
    	private int readTimeout = -1;
    
    	private int writeTimeout = -1;
    
    	private Semaphore writeSemaphore = new Semaphore(1, 1);
    
    	/// <summary>
    	/// 缓冲区所使用的数组.
    	/// </summary>
    	private T[] dataBuf;
    
    	private object bufLock = new object();
    
    	/// <summary>
    	/// 如果当前缓冲区中有数据可读, 它将会被设置.
    	/// </summary>
    	private Semaphore readSemaphore = new Semaphore(0, 1);
    
    	/// <summary>
    	/// 创建一个具体默认容量的缓冲区.
    	/// </summary>
    	public ArrayBuffer()
    		: this(DFLT_SIZE) {
    	}
    
    	/// <summary>
    	/// 创建一个指定容量的缓冲区.
    	/// </summary>
    	/// <param name="capacity">缓冲区的容量.</param>
    	public ArrayBuffer(int capacity)
    		: this(new T[capacity]) {
    	}
    
    	/// <summary>
    	/// 使用指定的数组来创建一个缓冲区.
    	/// </summary>
    	/// <param name="buf">缓冲区将要使用的数组.</param>
    	public ArrayBuffer(T[] buf)
    		: this(buf, 0, 0) {
    	}
    
    	/// <summary>
    	/// 使用指定的数组来创建一个缓冲区, 且该数组已经包含数据.
    	/// </summary>
    	/// <param name="buf">缓冲区将要使用的数组.</param>
    	/// <param name="offset">数据在数组中的偏移.</param>
    	/// <param name="size">数据的字节数.</param>
    	public ArrayBuffer(T[] buf, int offset, int size) {
    		this.dataBuf = buf;
    		capacity = buf.Length;
    		available = size;
    		space = capacity - available;
    		rd_nxt = offset;
    		wr_nxt = offset + size;
    	}
    
    	/// <summary>
    	/// 缓冲区还能容纳的元素数目.
    	/// </summary>
    	public int Space {
    		get {
    			return space;
    		}
    	}
    
    	/// <summary>
    	/// 缓冲区中可供读取的数据的元素数目
    	/// </summary>
    	public int Available {
    		get {
    			return available;
    		}
    	}
    
    	/// <summary>
    	/// get, set 接收缓冲区的大小(元素数目). 默认值为 512K.
    	/// Capacity 不能设置为小于 Available 的值(实现会忽略这样的值).
    	/// </summary>
    	public int Capacity {
    		get {
    			return capacity;
    		}
    		set {
    			lock (bufLock) {
    				if (value < available || value == 0) {
    					return;
    					//throw new ApplicationException("Capacity must be larger than Available.");
    				}
    				if (value == capacity) {
    					return;
    				}
    				if (value > capacity && space ==0) {
    					// 可写空间变为非空, 释放可写信号.
    					writeSemaphore.Release();
    				}
    
    				T[] buf = new T[value];
    				if (available > 0) {
    					available = ReadData(buf, 0, buf.Length);
    					// 下面的用法是错误的!
    					//available = Read(buf, 0, buf.Length);
    				}
    				dataBuf = buf;
    				capacity = value;
    				space = capacity - available;
    				rd_nxt = 0;
    				// 当容量缩小时, 可能导致变化后可写空间为0, 这时wr_nxt=0.
    				wr_nxt = (space == 0) ? 0 : available;
    			}
    		}
    	}
    
    	/// <summary>
    	/// Read 方法的超时时间(单位毫秒). 默认为 -1, 表示无限长.
    	/// </summary>
    	public int ReadTimeout {
    		get {
    			return readTimeout;
    		}
    		set {
    			readTimeout = value;
    		}
    	}
    
    	/// <summary>
    	/// Write 方法的超时时间(单位毫秒). 默认为 -1, 表示无限长.
    	/// </summary>
    	public int WriteTimeout {
    		get {
    			return writeTimeout;
    		}
    		set {
    			writeTimeout = value;
    		}
    	}
    
    	/// <summary>
    	/// 清空本缓冲区.
    	/// </summary>
    	public void Clear() {
    		lock (bufLock) {
    			available = 0;
    			space = capacity;
    			rd_nxt = 0;
    			wr_nxt = 0;
    		}
    	}
    
    	/*
    	/// <summary>
    	/// 将读指针向前移动 num 个单元. 如果 num 大于 Avalable,
    	/// 将抛出异常.
    	/// </summary>
    	/// <param name="num">读指针要向前的单元个数.</param>
    	/// <exception cref="ApplicationException">num 大于 Avalable.</exception>
    	public void Seek(int num) {
    	}
    	*/
    
    	/// <summary>
    	/// 未实现.
    	/// </summary>
    	/// <returns></returns>
    	public T ReadOne() {
    		throw new Exception("Not supported.");
    	}
    
    	/// <summary>
    	/// 从缓冲区中读取数据. 读取的字节数一定是 buf.Length 和 Available 的较小者.
    	/// </summary>
    	/// <param name="buf">存储接收到的数据的缓冲区.</param>
    	/// <returns>已经读取的字节数. 一定是 size 和 Available 的较小者.</returns>
    	public int Read(T[] buf) {
    		return Read(buf, 0, buf.Length);
    	}
    
    	/// <summary>
    	/// 从缓冲区中读取数据. 读取的字节数一定是 size 和 Available 的较小者.
    	/// 本方法是线程安全的.
    	/// </summary>
    	/// <param name="buf">存储接收到的数据的缓冲区.</param>
    	/// <param name="offset">buf 中存储所接收数据的位置.</param>
    	/// <param name="size">要读取的字节数.</param>
    	/// <returns>已经读取的字节数. 一定是 size 和 Available 的较小者.</returns>
    	public int Read(T[] buf, int offset, int size) {
    		if (!readSemaphore.WaitOne(readTimeout, false)) {
    			throw new ApplicationException("Read timeout.");
    		}
    
    		lock (bufLock) {
    			int nread = ReadData(buf, offset, size);
    			if (space == 0) {
    				// 释放可写信号.
    				writeSemaphore.Release();
    			}
    			space += nread;
    			available -= nread;
    			if (available > 0) {
    				// 释放一个信号, 以便下一次再读.
    				readSemaphore.Release();
    			}
    			return nread;
    		}
    	}
    
    	/// <summary>
    	/// 把本缓冲区的数据复制指定的数组中, 并移动读指针.
    	/// </summary>
    	private int ReadData(T[] buf, int offset, int size) {
    		int nread = (available >= size) ? size : available;
    		// 当 rd_nxt 在 wr_nxt 的左边时, 缓冲的右边包含的网络字节数.
    		int rdata = capacity - rd_nxt;
    		if (rd_nxt < wr_nxt || rdata >= nread/*隐含rd_nxt >= wr_nxt*/) {
    			Array.Copy(dataBuf, rd_nxt, buf, offset, nread);
    			rd_nxt += nread;
    		} else {
    			// 两次拷贝.
    			Array.Copy(dataBuf, rd_nxt, buf, offset, rdata);
    			rd_nxt = nread - rdata;
    			Array.Copy(dataBuf, 0, buf, offset + rdata, rd_nxt);
    		}
    		return nread;
    	}
    
    	/// <summary>
    	/// 写入数据到缓冲区.
    	/// </summary>
    	/// <param name="buf">要写入的数据的缓冲区.</param>
    	public void Write(byte[] buf) {
    		Write(buf, 0, buf.Length);
    	}
    
    	/// <summary>
    	/// 写入数据到缓冲区. 注意: 本方法不是线程安全的.
    	/// </summary>
    	/// <param name="buf">要写入的数据的缓冲区.</param>
    	/// <param name="offset">数据缓冲区中要写入数据的起始位置.</param>
    	/// <param name="size">要写入的字节数.</param>
    	/// <exception cref="ApplicationException">如果空间不足, 会抛出异常.</exception>
    	public void Write(byte[] buf, int offset, int size) {
    		int n_left = size;
    		int n_offset = offset;
    		int nwrite;
    		int rspace;
    		while (n_left > 0) {
    			// 这样的超时控制并不准确!
    			if (!writeSemaphore.WaitOne(writeTimeout, false)) {
    				throw new ApplicationException("Write timeout.");
    			}
    
    			lock (bufLock) {
    				nwrite = (space >= n_left) ? n_left : space;
    				// 当 rd_nxt 在 wr_nxt 的左边时, 缓冲的右边可以放置的网络字节数.
    				rspace = capacity - wr_nxt;
    				if (wr_nxt < rd_nxt || rspace >= nwrite/*隐含wr_nxt >= rd_nxt*/) {
    					Array.Copy(buf, n_offset, dataBuf, wr_nxt, nwrite);
    					wr_nxt += nwrite;
    					if (wr_nxt == capacity) {
    						wr_nxt = 0;
    					}
    				} else {
    					// 两次拷贝.
    					Array.Copy(buf, n_offset, dataBuf, wr_nxt, rspace);
    					wr_nxt = nwrite - rspace; // 是调用下一句之后的 wr_nxt值.
    					Array.Copy(buf, n_offset + rspace, dataBuf, 0, wr_nxt);
    				}
    				if (available == 0) {
    					readSemaphore.Release();
    				}
    				space -= nwrite;
    				available += nwrite;
    				if (space > 0) {
    					// 释放可写信号.
    					writeSemaphore.Release();
    				}
    
    				n_offset += nwrite;
    				n_left -= nwrite;
    			}
    		} // end while
    
    		/* 不需要 WriteTimeout 的版本.
    		// 和 Read 是对称的.
    		lock (bufLock) {
    			if (space < size) {
    				// TBD: 是否实现写超时机制?
    				throw new ApplicationException("Not enough space.");
    			}
    
    			// 当 wr_nxt 在 rd_nxt 的左边时, 缓冲的右边可以放置的网络字节数.
    			int rspace = capacity - wr_nxt;
    			if (wr_nxt < rd_nxt || rspace >= size) {
    				Array.Copy(buf, offset, dataBuf, wr_nxt, size);
    				wr_nxt += size;
    			} else {
    				// 两次拷贝.
    				Array.Copy(buf, offset, dataBuf, wr_nxt, rspace);
    				wr_nxt = size - rspace;
    				Array.Copy(buf, offset + rspace, dataBuf, 0, wr_nxt);
    			}
    			if (available == 0) {
    				readSemaphore.Release();
    			}
    			space -= size;
    			available += size;
    		}
    		*/
    	}
    }
    
    Posted by ideawu at 2009-02-20 12:36:12
  • 2008-12-04

    Mesh: 对象的多对多关系

    Views: 10937 | No Comments

    在数据库设计中, 有三种映射关系: 一对一, 一对多, 多对多.

    一对一关系几乎没有任何用处, 任何一对一关系都是因为技术的限制, 而不是业务的本质.

    一对多(或者多对一)关系有很大的用处, 因为它能表示树形结构. 不过, 很多重要的情况又不能用树来表示.

    多对多关系表示网状关系, 这才是现实事物的反映. 但是, 多对多比一对多复杂了一个数量级, 如果能使用一对多, 会使事情变得简单. 同理, 一对多比一对一复杂了一个数量级.

    在关系数据库设计中, 一对多使用2个表, 多对多使用3个表. 所以, 关系数据库不能很好地处理多对多关系. 我们需要一种新型的十分适合多对多关系的数据存储和操作工具, 也就是说, 我们需要一种网式数据库.

    Posted by ideawu at 2008-12-04 17:44:08
  • 2008-05-21

    一种有趣的编程模型

    Views: 12624 | 1 Comment

    假设我们从某处接收命令, 一次只能接收一个. 如果命令为 1, 我们只要祈祷, 就可以得到金钱. 如果命令为 2, 那么我们必须到外面 ATM 机取钱, 然后放入钱包.

    这个模型可以用下面的计算机伪代码描述:

    while(cmd = read_cmd()){
        switch(cmd){
            case 1:
                m = pray();
                wallet.put(m);
                break;
            case 2:
                m = get_money_from_atm();
                wallet.put(m);
                break;
        }
    }
    

    从直觉可以知道, get_money_from_atm() 耗时可能达到 1 个小时, 但是 pray() 只需要几秒. 上面的程序是线性的, 所以 get_money_from_atm() 会影响我们接收新的命令, 从而影响我们的钱包快速的增加. 这是一个严重的问题! 下面将讨论如何解决这个问题.

    我们可以委托其他人帮我们去 ATM 机取钱. 所以,

    m = get_money_from_atm();
    wallet.put(m);
    

    可以改为

    delegate(){
        m = get_money_from_atm();
        wallet.put(m);
    }
    

    delegate 代码段中的代码会立即被加入到异步执行队列中(在计算机中常常由其它的线程处理这个队列), 所以我们又可以去接收命令了.

    问题是, wallet.put() 不是线程安全的, 所以, 调用它时必须加锁. 所以代码应该是

    delegate(){
        m = get_money_from_atm();
        lock(wallet){
            wallet.put(m);
        }
    }
    
    wallet.put(m);
    

    也要修改为

    lock(wallet){
        wallet.put(m);
    }
    

    最终的代码是

    while(cmd = read_cmd()){
        switch(cmd){
            case 1:
                m = pray();
                lock(wallet){
                    wallet.put(m);
                }
                break;
            case 2:
                // 这段代码交给另一个线程执行, 所以它几乎不消耗当前线程的时间.
                delegate(){
                    m = get_money_from_atm();
                    lock(wallet){
                        wallet.put(m);
                    }
                }
                break;
        }
    }
    

    这里还有很大的改进空间, case 1 情况比 case 2 出现的机率大得多, 很多时候并没有其它的线程争用 wallet, 但我们还是对它进行加锁, 这些加锁都是无谓的. 我想, 能不能达到下面的效果:

    while(cmd = read_cmd()){
        switch(cmd){
            case 1:
                m = pray();
                wallet.put(m);
                break;
            case 2:
                delegate(){
                    // 将本代码块中的代码委托给其它线程执行.
                    m = get_money_from_atm();
                }then{
                    // 这里的代码会在委托提交后执行.
                    continue;
                }onexit{
                    // 一旦委托的代码执行完毕, 会跳到这里继续执行.
                    wallet.put(m);
                }// 这段几乎不会消耗当前线程的时间.
                break;
        }
    }
    

    委托调用立即返回, 但是, 一旦委托过程处理完毕, 代码又被插入原来的地方继续执行. 也就是说, 一旦 get_money_from_atm() 在另一个线程中执行, 便会产生一个中断, 并执行 onexit 代码段. 当然, 这个中断的级别不能高到中断其它代码的执行, 如 pray().

    有一个可以改进的地方, 就是我们不希望委托的数量过多, 所以为 delegate 增加一个参数, delegate(maxwait=2), 表示如果有 2 个执行中的委托, 便阻塞当前的线程.

    Posted by ideawu at 2008-05-21 19:02:27
|<<<5678910111213>>>| 11/13 Pages, 76 Results.