2013-09-05

150行C代码的comet服务器

Views: 43618 | 39 Comments

Comet 技术就是常见的 Web 服务器"推"技术, 用于向网页实时地推送数据. 最常见的 Comet 技术应用在网页聊天, 当然还可以应用于很多的方面, 如微博更新, 热点新闻推送, 股票即时行情等等, 甚至是网页游戏!

Comet 技术如此重要, 但市面上并没有真正流行通用的 Comet 服务器和解决方案, 比较知名的互联网公司大多是自己开发, 或者基于开源服务器进行二次开发, 例如基于 Jetty(一个开源 Java Web 容器), 而 Facebook 的聊天系统的 Comet 服务器是基于 Mochiweb(一个开源的 Erlang Web 服务器).

当然还有比较知名的以 nginx 模块形式出现的 nginx-push-stream, 但根据实际使用经验, 这个模块无法稳定支撑 10 万个并发连接, 更别谈百万同时在线了. 这也是这个模块为什么没有被普遍大规模应用的原因.

既然大家都开发自己的 Comet 服务器, 那必然有其中的道理, 说是核心技术倒说不上, 不过是为了便于扩展, 能很好地和现有系统整合, 易于运维和管理而已. 那么, 要开发一个 Comet 服务器到底有多难呢? 其实, 一个最简单的 Comet 服务器只需要 150 行 C 语言代码!

先说一下 Comet 技术, 从浏览器支持考虑, long-polling 技术显然是最佳的选择, 又从跨域方面考虑, 那必然是 script tag long-polling 技术获胜. 这也是 Facebook 的选择. 所以, 最简单的 Comet 服务器只支持 Script tag long-polling 即可.

Long-polling 技术要求浏览器的每一个网页和服务器保持一个 HTTP 请求连接(TCP 连接), 服务器收到这样的连接后, 会立即返回 HTTP 首部, 接着通过 chunk 传输编码, 源源不断地将一个个消息发送给浏览器.

一个完整的 chunk 编码的 HTTP 响应如下:

HTTP/1.1 200 OK
Date: Fri, 31 Dec 1999 23:59:59 GMT
Content-Type: text/plain
Transfer-Encoding: chunked

1a; ignore-stuff-here
abcdefghijklmnopqrstuvwxyz
10
1234567890abcdef
0
[blank line here]

只要服务器不返回只有"0"的那一行以及紧接着的空白行, 那么就可以保持向网页推数据.

最简单的 Comet 服务器使用了 libevent 框架, 你可以在这里得到它的代码: https://github.com/ideawu/icomet. 欢迎对 Comet 了解的前端工程师贡献 JavaScript 相关的代码!

使用方式:

订阅: curl -v "http://127.0.0.1:8000/sub?id=12"
推送: curl -v "http://127.0.0.1:8000/pub?id=12&content=hi"

这个 Comet 服务器的最大并发数并没有进行测试, 但 last.fm 的 CTO 对一个同样是基于 libevent 的类似程序进行测试, 100 万连接只需要 2GB 内存.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <signal.h>
#include <fcntl.h>
#include <unistd.h>
#include <event.h>
#include <evhttp.h>
#include <event2/event.h>
#include <event2/http.h>
#include <event2/buffer.h>
#include <event2/util.h>
#include <event2/keyvalq_struct.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#define MAX_CHANNELS    1000

struct Channel{
    int id;
    struct evhttp_request *req;
};

struct Channel channels[MAX_CHANNELS];


void init(){
    int i;
    for(i=0; i<MAX_CHANNELS; i++){
        channels[i].id = i;
        channels[i].req = NULL;
    }
}

// called when user disconnects
void cleanup(struct evhttp_connection *evcon, void *arg){
    struct Channel *sub = (struct Channel *)arg;
    printf("disconnected uid %d\n", sub->id);
    sub->req = NULL;
}

void sub_handler(struct evhttp_request *req, void *arg)
{
    struct evkeyvalq params;
    const char *uri = evhttp_request_get_uri(req);
    evhttp_parse_query(uri, &params);

    struct evbuffer *buf;
    
    int uid = -1;
    struct evkeyval *kv;
    for(kv = params.tqh_first; kv; kv = kv->next.tqe_next){
        if(strcmp(kv->key, "id") == 0){
            uid = atoi(kv->value);
        }
    }
    
    if(uid < 0 || uid >= MAX_CHANNELS){
        buf = evbuffer_new();
        evhttp_send_reply_start(req, HTTP_NOTFOUND, "Not Found");
        evbuffer_free(buf);
        return;
    }
    
    printf("sub: %d\n", uid);
    struct Channel *sub = &channels[uid];
    sub->req = req;

    buf = evbuffer_new();
    evhttp_send_reply_start(req, HTTP_OK, "OK");
    evhttp_add_header(req->output_headers, "Content-Type", "text/html; charset=utf-8");
    
    evbuffer_add_printf(buf, "{type: \"welcome\", id: \"%d\", content: \"hello world!\"}\n", uid);
    evhttp_send_reply_chunk(req, buf);
    evbuffer_free(buf);

    evhttp_connection_set_closecb(req->evcon, cleanup, &channels[uid]);
}

void pub_handler(struct evhttp_request *req, void *arg){
    struct evkeyvalq params;
    const char *uri = evhttp_request_get_uri(req);
    evhttp_parse_query(uri, &params);

    struct evbuffer *buf;
    
    int uid = -1;
    const char *content = "";
    struct evkeyval *kv;
    for(kv = params.tqh_first; kv; kv = kv->next.tqe_next){
        if(strcmp(kv->key, "id") == 0){
            uid = atoi(kv->value);
        }else if(strcmp(kv->key, "content") == 0){
            content = kv->value;
        }
    }
    
    struct Channel *sub = NULL;
    if(uid < 0 || uid >= MAX_CHANNELS){
        sub = NULL;
    }else{
        sub = &channels[uid];
    }
    if(sub && sub->req){
        printf("pub: %d content: %s\n", uid, content);
        
        // push to browser
        buf = evbuffer_new();
        evbuffer_add_printf(buf, "{type: \"data\", id: \"%d\", content: \"%s\"}\n", uid, content);
        evhttp_send_reply_chunk(sub->req, buf);
        evbuffer_free(buf);
        
        // response to publisher
        buf = evbuffer_new();
        evhttp_add_header(req->output_headers, "Content-Type", "text/html; charset=utf-8");
        evbuffer_add_printf(buf, "ok\n");
        evhttp_send_reply(req, 200, "OK", buf);
        evbuffer_free(buf);
    }else{
        buf = evbuffer_new();
        evbuffer_add_printf(buf, "id: %d not connected\n", uid);
        evhttp_send_reply(req, 404, "Not Found", buf);
        evbuffer_free(buf);
    }
}


int main(int argc, char **argv){
    signal(SIGPIPE, SIG_IGN);

    struct event_base *base;
    struct evhttp *http;
    struct evhttp_bound_socket *handle;
    unsigned short port = 8000;
    
    init();

    base = event_base_new();
    http = evhttp_new(base);
    evhttp_set_cb(http, "/sub", sub_handler, NULL);
    evhttp_set_cb(http, "/pub", pub_handler, NULL);
    //evhttp_set_gencb(http, request_handler, NULL);
    handle = evhttp_bind_socket_with_handle(http, "0.0.0.0", port);
    printf("server listen at 127.0.0.1:%d\n", port);
    event_base_dispatch(base);

    return 0;
}

Related posts:

  1. Libevent 2 HTTP 客户端示例
  2. C#封装log4net
  3. 使用 jemalloc 编译过程出错的问题
  4. Ideawu.P2P API 简介
  5. 基于列的数据库
Posted by ideawu at 2013-09-05 21:42:36

39 Responses to "150行C代码的comet服务器"

  • 按照https://github.com/ideawu/icomet这上面的在centos下配置好后,在centos中自带的firefox浏览器中访问http://www.test.com/icomet/sub?cname=xxx(本机访问本机),要等20多秒才有结果输出,为何这么慢呢? Reply
    @confu: 因为这就是 long-polling! Reply
    @ideawu: 多谢,看了这篇文章后http://feilong.me/2011/07/talk-about-polling-and-long-polling终于对什么是长轮询有了透彻的理解了 Reply
  • nginx-push-stream在我司有超过100万连接的例子,虽然不是很好用,但是简单。 Reply
  • 刚试了下,如果启动时以:nohup ./comet-server comet.conf &
    方式来启动,即使关掉SecureCRT进程还在,能解释下原因吗? Reply
  • 目前发现个问题,以./comet-server comet.conf & 启动后,确实也启动了,但如果把终端连接服务器的工具SecureCRT关掉了,进程也一起关了,这是啥问题? Reply
  • ./comet-server comet.conf,这个启动时如何作为后台服务启动? Reply
    @confu: Hi, 可以加 -d 选项, ./comet-server -d comet.conf Reply
  • 测试icomet的chat DEMO 发现存在数据定时重复推出的现象,是不是小bug? Reply
    @sea: 竟然有这种情况? 能否说一下你的使用方法, 特别是浏览器类型和版本. Reply
    @ideawu: ie8打开2个窗口,运行demo,链接如下:
    http://127.0.0.1/php/web/chat.html?channel=channelName
    然后发每个窗口发几条信息就出现了,是不是同一channel在一台机上使用的问题? Reply
    @sea: Hi, 你能进行下面的操作, 帮助我检查一下问题吗?

    1. 启动 icomet, 用 ie8 打开两个窗口, 和你原来的操作一样. 当出现问题时, 麻烦把 log.txt 发给我的邮箱.

    2. 换浏览器, 如 firefox, chrome, 进行同样的操作, 看是否还有问题. Reply
    @ideawu: 我们在使用过程中也发现重复推送的问题了。
    没有使用浏览器,是通过client4j来使用的。我们查看了后台日志,确实推送了两次。 Reply
  • 正在了解comet相关技术,没想到又是博主的!顶一个 Reply
    @xmxoxo: 哈哈, how old are you?(怎么老是你?) Reply
    @ideawu: 真是有缘啊,百度找comet技术的时候又逛到了你这里!
    正要实现一个类似于实时大盘数据的系统,跟聊天室有点不同,
    数据就是固定个数的。正在看icomet的相关文档,
    服务端发布数据也是通过访问 http://127.0.0.1:8000/pub?id=12&content=hi 这样子来发布么? Reply
    @xmxoxo: icomet 用来做实时大盘也是它的设计考虑之一. 数据发布, 可以直接调用 pub 接口, 也可以用 php 等封装一下, 由 php 来调用, 这样可以在 php 中做身份验证. Reply
    @ideawu: 数据发布是可以用PHP封装一下,那订阅呢,我看DEMO里是直接连服务器的8100端口的,在服务端能否有个验证之类的机制? Reply
    @xmxoxo: 订阅也是有验证的. 你可以将 icomet.conf 中的验证方式改为 token, 然后用 php 封装一下 sign 接口, 在里面做验证. 源码中有示例用的 pub.php, sign.php 两个文件, 你可以参考下.
  • 挺强大的,还使用了libevent框架。不过,我倒是没用过libevent,有空研究下代码,再看libevent。 Reply
    @armsword: 正因为使用了libevent才能150行代码, 不然, 光是解析HTTP就是几百上千行代码. libevent 是个不错的网络框架. Reply

« [1][2] » 2/2

Leave a Comment