2013-09-05

150行C代码的comet服务器

Views: 44300 | 39 Comments

Comet 技术就是常见的 Web 服务器"推"技术, 用于向网页实时地推送数据. 最常见的 Comet 技术应用在网页聊天, 当然还可以应用于很多的方面, 如微博更新, 热点新闻推送, 股票即时行情等等, 甚至是网页游戏!

Comet 技术如此重要, 但市面上并没有真正流行通用的 Comet 服务器和解决方案, 比较知名的互联网公司大多是自己开发, 或者基于开源服务器进行二次开发, 例如基于 Jetty(一个开源 Java Web 容器), 而 Facebook 的聊天系统的 Comet 服务器是基于 Mochiweb(一个开源的 Erlang Web 服务器).

当然还有比较知名的以 nginx 模块形式出现的 nginx-push-stream, 但根据实际使用经验, 这个模块无法稳定支撑 10 万个并发连接, 更别谈百万同时在线了. 这也是这个模块为什么没有被普遍大规模应用的原因.

既然大家都开发自己的 Comet 服务器, 那必然有其中的道理, 说是核心技术倒说不上, 不过是为了便于扩展, 能很好地和现有系统整合, 易于运维和管理而已. 那么, 要开发一个 Comet 服务器到底有多难呢? 其实, 一个最简单的 Comet 服务器只需要 150 行 C 语言代码!

先说一下 Comet 技术, 从浏览器支持考虑, long-polling 技术显然是最佳的选择, 又从跨域方面考虑, 那必然是 script tag long-polling 技术获胜. 这也是 Facebook 的选择. 所以, 最简单的 Comet 服务器只支持 Script tag long-polling 即可.

Long-polling 技术要求浏览器的每一个网页和服务器保持一个 HTTP 请求连接(TCP 连接), 服务器收到这样的连接后, 会立即返回 HTTP 首部, 接着通过 chunk 传输编码, 源源不断地将一个个消息发送给浏览器.

一个完整的 chunk 编码的 HTTP 响应如下:

HTTP/1.1 200 OK
Date: Fri, 31 Dec 1999 23:59:59 GMT
Content-Type: text/plain
Transfer-Encoding: chunked

1a; ignore-stuff-here
abcdefghijklmnopqrstuvwxyz
10
1234567890abcdef
0
[blank line here]

只要服务器不返回只有"0"的那一行以及紧接着的空白行, 那么就可以保持向网页推数据.

最简单的 Comet 服务器使用了 libevent 框架, 你可以在这里得到它的代码: https://github.com/ideawu/icomet. 欢迎对 Comet 了解的前端工程师贡献 JavaScript 相关的代码!

使用方式:

订阅: curl -v "http://127.0.0.1:8000/sub?id=12"
推送: curl -v "http://127.0.0.1:8000/pub?id=12&content=hi"

这个 Comet 服务器的最大并发数并没有进行测试, 但 last.fm 的 CTO 对一个同样是基于 libevent 的类似程序进行测试, 100 万连接只需要 2GB 内存.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <signal.h>
#include <fcntl.h>
#include <unistd.h>
#include <event.h>
#include <evhttp.h>
#include <event2/event.h>
#include <event2/http.h>
#include <event2/buffer.h>
#include <event2/util.h>
#include <event2/keyvalq_struct.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#define MAX_CHANNELS    1000

struct Channel{
    int id;
    struct evhttp_request *req;
};

struct Channel channels[MAX_CHANNELS];


void init(){
    int i;
    for(i=0; i<MAX_CHANNELS; i++){
        channels[i].id = i;
        channels[i].req = NULL;
    }
}

// called when user disconnects
void cleanup(struct evhttp_connection *evcon, void *arg){
    struct Channel *sub = (struct Channel *)arg;
    printf("disconnected uid %d\n", sub->id);
    sub->req = NULL;
}

void sub_handler(struct evhttp_request *req, void *arg)
{
    struct evkeyvalq params;
    const char *uri = evhttp_request_get_uri(req);
    evhttp_parse_query(uri, &params);

    struct evbuffer *buf;
    
    int uid = -1;
    struct evkeyval *kv;
    for(kv = params.tqh_first; kv; kv = kv->next.tqe_next){
        if(strcmp(kv->key, "id") == 0){
            uid = atoi(kv->value);
        }
    }
    
    if(uid < 0 || uid >= MAX_CHANNELS){
        buf = evbuffer_new();
        evhttp_send_reply_start(req, HTTP_NOTFOUND, "Not Found");
        evbuffer_free(buf);
        return;
    }
    
    printf("sub: %d\n", uid);
    struct Channel *sub = &channels[uid];
    sub->req = req;

    buf = evbuffer_new();
    evhttp_send_reply_start(req, HTTP_OK, "OK");
    evhttp_add_header(req->output_headers, "Content-Type", "text/html; charset=utf-8");
    
    evbuffer_add_printf(buf, "{type: \"welcome\", id: \"%d\", content: \"hello world!\"}\n", uid);
    evhttp_send_reply_chunk(req, buf);
    evbuffer_free(buf);

    evhttp_connection_set_closecb(req->evcon, cleanup, &channels[uid]);
}

void pub_handler(struct evhttp_request *req, void *arg){
    struct evkeyvalq params;
    const char *uri = evhttp_request_get_uri(req);
    evhttp_parse_query(uri, &params);

    struct evbuffer *buf;
    
    int uid = -1;
    const char *content = "";
    struct evkeyval *kv;
    for(kv = params.tqh_first; kv; kv = kv->next.tqe_next){
        if(strcmp(kv->key, "id") == 0){
            uid = atoi(kv->value);
        }else if(strcmp(kv->key, "content") == 0){
            content = kv->value;
        }
    }
    
    struct Channel *sub = NULL;
    if(uid < 0 || uid >= MAX_CHANNELS){
        sub = NULL;
    }else{
        sub = &channels[uid];
    }
    if(sub && sub->req){
        printf("pub: %d content: %s\n", uid, content);
        
        // push to browser
        buf = evbuffer_new();
        evbuffer_add_printf(buf, "{type: \"data\", id: \"%d\", content: \"%s\"}\n", uid, content);
        evhttp_send_reply_chunk(sub->req, buf);
        evbuffer_free(buf);
        
        // response to publisher
        buf = evbuffer_new();
        evhttp_add_header(req->output_headers, "Content-Type", "text/html; charset=utf-8");
        evbuffer_add_printf(buf, "ok\n");
        evhttp_send_reply(req, 200, "OK", buf);
        evbuffer_free(buf);
    }else{
        buf = evbuffer_new();
        evbuffer_add_printf(buf, "id: %d not connected\n", uid);
        evhttp_send_reply(req, 404, "Not Found", buf);
        evbuffer_free(buf);
    }
}


int main(int argc, char **argv){
    signal(SIGPIPE, SIG_IGN);

    struct event_base *base;
    struct evhttp *http;
    struct evhttp_bound_socket *handle;
    unsigned short port = 8000;
    
    init();

    base = event_base_new();
    http = evhttp_new(base);
    evhttp_set_cb(http, "/sub", sub_handler, NULL);
    evhttp_set_cb(http, "/pub", pub_handler, NULL);
    //evhttp_set_gencb(http, request_handler, NULL);
    handle = evhttp_bind_socket_with_handle(http, "0.0.0.0", port);
    printf("server listen at 127.0.0.1:%d\n", port);
    event_base_dispatch(base);

    return 0;
}

Related posts:

  1. Libevent 2 HTTP 客户端示例
  2. C#封装log4net
  3. 使用 jemalloc 编译过程出错的问题
  4. Ideawu.P2P API 简介
  5. 基于列的数据库
Posted by ideawu at 2013-09-05 21:42:36

39 Responses to "150行C代码的comet服务器"

  • 如何手动关闭某个通道? Reply
    @confu: 在另一篇BLOG上看到了curl -v “http://127.0.0.1:8100/close?cname=12″ Reply
  • icomet如何监听某条消息是否成功推送到了APP客户端?是不是需要加下回调的功能? Reply
    @confu: 如果有这种需求, 可以在 icomet 之外做, 比如在 js 回调函数中, 调用一次服务器的某个接口. Reply
  • ideawu,把你的QQ号或者其它聊天帐号发给我一个,你要不方便公司就发到我邮箱untxok@163.com吧,通过BLOG留言反馈太慢了,多谢了 Reply
    @confu: 可以用 gmail 聊吧, 在我的个人简历里有邮箱地址. Reply
  • libevent通过bufferevent_enable可以知道客户端是否已断开,那么在icomet中通过调用什么方法才能知道客户端是否已断开?多谢! Reply
  • icomet有定期清理管理的机制么? Reply
    @confu: 有的. 每一个通道一旦空闲达到一定时间, 便会被清理. Reply
  • 如何统计在所有管道中正在轮询的用户数量? Reply
  • 在2个不同的服务器上尝试推送信息,发现一台必须把"\"编码成%5c 另一台却不能编码
    请问博主是设置的问题么,如何解决? Reply
  • 请教个问题,如果客户端APP上的Andriod或IOS调用远程服务器上的icomet,订阅应该是调用类似的:http://www.xx.com/web/php/sign.php?cb=icomet_cb_0&cname=84028170900653324994&_=1388402817063&callback=cb

    发消息调用类似的:http://confu.yoocai.com/web/php/pub.php?cb=jQuery191030807198207848163_1388402817072&encoded=1&cname=84028170900653324994&content=%7B%5C%22uid%5C%22%3A%5C%22u131041%5C%22%2C%5C%22nickname%5C%22%3A%5C%22u131041%5C%22%2C%5C%22content%5C%22%3A%5C%22hello+every+one!%5C%22%7D

    刚才提问的发消息的链接发错了

    以上理解是否正确? Reply
    @confu: Hi, 订阅应该是 sub.php, sign.php 是用来验证身份并获取 token 的接口. Reply
    @ideawu: 请问下sub.php在这上面https://github.com/ideawu/icomet的哪个文件夹中? Reply
  • 请教个问题,如果客户端APP上的Andriod或IOS调用远程服务器上的icomet,订阅应该是调用类似的:http://www.xx.com/web/php/sign.php?cb=icomet_cb_0&cname=84028170900653324994&_=1388402817063&callback=cb

    发消息调用类似的:http://confu.yoocai.com:8100/sub?cb=icomet_cb_0&cname=84028170900653324994&seq=1&noop=23&token=2ba36125075e066b39c73f7b753e551f&_=1388402817090&callback=cb

    以上理解是否正确? Reply
  • 订阅成功后返回的是串是:icomet_cb_0({type: "sign", cname: "84028170900653324994", seq: 0, token: "2ba36125075e066b39c73f7b753e551f", expires: 30, sub_timeout: 30}),这种是什么格式的串?肯定不是JSON的 Reply
    @confu: 这是 jsonp. Reply

« [1][2] » 1/2

Leave a Comment