存档

文章标签 ‘epoll’

Common Lisp使用iolib进行网络编程

一月 4th, {2012 2 条评论 15,856 人阅读过  

Common Lisp进行网络编程可用的库还是挺多的,比较常用的库有usocketiolib,usocket我简单了解了一下没有真正拿来用,它的API比较简单,文档写得比较全面,相比之下,iolib要比usocket强大的多,但缺点是文档太少,官方的文档可用的内容非常少,但如果能阅读一下iolib的相关源码,就会发现其实iolib是一个很强大的网络编程库,其中包含了DNS解析,socket基本操作(bind,listen等等),IO多路复用以及通常用来做IPC的socketpair,而且iolib的multiplex用起来有种libevent的感觉,用iolib可以实现一般的应用层网络编程,至于是否支持raw socket,我还没仔细研究,不过感觉应该问题不大。

1.iolib的安装
使用asdf-install可以在线安装iolib,但貌似asdf-install不会自动解决包的依赖问题,最近才发现原来asdf-install其实已经是一个废弃的项目,官方已经不推荐使用了,在cliki的asdf-install首页最开头就有一句醒目的提示语:

ASDF-install is OBSOLETE. DO NOT USE ASDF-INSTALL, EVER. DO NOT ASK AROUND ABOUT HOW TO GET IT RUNNING. IT IS O-B-S-O-L-E-T-E. Not working. Not maintained. Please use quicklisp instead.

取而代之的是quicklisp,之前就有人跟我推荐过quicklisp我还没来得及尝试,这几天试了下确实非常方便,可以自动地下载程序包及其依赖的相关程序包,无需手工解决依赖问题,让我想到debian的apt-get,关于quicklisp的安装和使用都非常简单,在它首页上都有使用说明。而且quicklisp几乎每个月都会在官方blog上放出过去的几个月程序包的下载排行(如:Project download stats for November),可以在选择程序包的时候有个参考。

2.创建passive socket

当创建一个用于充当server角色的程序时通常需要创建passive socket,用来监听客户端的连接,关于socket编程的基本步骤已经是大家所熟知的了,create socket,bind,listen,accept等等,iolib是使用cffi(The Common Foreign Function Interface)通过调用linux系统调用来实现的,因此和用C语言编程几乎是一个套路,方法如下:

(setq socket
      (make-socket
       :connect :passive
       :address-family :internet
       :type :stream
       :external-format '(:utf-8 :eol-style :crlf)))
 
(bind-address socket
              (ensure-address "127.0.0.1")
              :port 1086
              :reuse-addr t)
 
(listen-on socket :backlog 10)
 
(setq client (accept-connection socket))
 
(multiple-value-bind (who port) (remote-name socket)
      (format t "Client ~A:~D connected.~%" who port))
 
(close socket)

几个操作一目了解,更细节的操作(比如如何创建UDP套接字)就去翻下源码好了,bind-address这个这个函数第二个参数用ensure-address把字符串转换成所需要的address类型,iolib也定义了一系列形如+ipv4-unspecified+的静态变量,类似于C语言里面的INADDR_ANY,最后一个参数reuse-addr相当于用setsockopt对套接字设置SO_REUSEADDR选项。

对于iolib的passive我一直有一个问题未能解决,当程序作为server在监听客户端连接时,在C语言中可以使用CTRL+C给程序发送SIGINT信号让程序终止,排除TIME_WAIT等这些情况,程序再次启动时仍可以bind到同一个指定端口(即使没有显示地调用close关闭套接字),但在slime中使用C-c C-c终止程序,并确保在slime-selecter中已经结束掉所有的用户线程之后,再次启动程序绑定同一个端口便会提示端口已被占用,除非结束掉lisp进程(sbcl/ccl等),我也就直接选择重启emacs,这个问题一直未能解决,困扰我很长时间,所以我只能在程序运行中不断地插入close来在不该关闭的地方临时关闭套接字。

3. 创建active socket

通常使用active socket的程序是作为客户端的角色,下面是我写的一段简单的示例代码,用于发送一个http请求:

(let (socket ip http)
  (setf socket (make-socket
              :connect :active
              :address-family :internet
              :type :stream
              :external-format '(:utf-8)
              :ipv6 nil))
 
  (setf ip (lookup-hostname "basiccoder.com"))
  (format t "IP of ~a is: ~a~%" host ip)
 
  (connect socket ip :port 80 :wait t)
  (format t "Connected to ~a via ~a:~a to ~a:~a~%"
          host (local-host socket) (local-port socket)
          (remote-host socket) (remote-host socket))
 
  (setf http (make-http-request "GET" "/" host
               (("Connection" "Closed")
                ("User-Agent" "Mozilla"))))
 
  (format t "send: ~A~%" http)
 
  (format socket http)
  (finish-output socket))

使用lookup-hostname来解决IP地址,通过connect来向远程服务器进行连接,make-http-request是我写的一个生成http请求头的一个宏,返回http请求字符串;创建的socket对象其实是一个流对象,因此可以使用format向流中写入数据,写入的数据会保存在缓冲区中,当调用(finish-output socket)函数时开始执行数据的发送操作。iolib也提供了send-to函数,不详细讨论了。

4. 从流中读取数据
由于socket对象是一个流对象,因此可以用任何从流中读出数据的方法来从socket中接收数据,如read-line,read-byte等等,但read-line存在一个问题,当使用read-line读取数据时,当数据中存在非ASCII字符中便会抛出异常,它在读取的过程中会对数据进行ASCII解码,而read-byte则不会存在这个问题,因为它读出的是二进制的字节,它不关心编码方式,但read-byte我感觉很多情况下是不太适用的,因为它一次只能读取一个字节,一般情况下多次执行这样的操作效率不会太高,当然,iolib也提供了receive-from函数,间接地调用了系统调用recvfrom(),是一种带缓冲区的接收方式,也比较符合C语言的编程习惯。

receive-from的使用方法如下:

(multiple-value-bind (buf-vector rbytes)
          (receive-from socket :buffer buf-vector
                               :start 0 :end 4096
                               :size 4096)

receive-from返回的是values,主返回值是包含接收到的数据的vector,另一个返回值是读取到的字节数,函数调用的参数里面:buffer不是必须的,当:buffer未指定时,则需要指定:size参数,这时receive-from会自动创建一个指定大小的vector并将数据填充后返回。receive-from返回的vector中保存的是字节码值,并不是字符串,可以使用octets-to-string函数将其转换成string,具体可以参考下我的这篇日志:Common Lisp为Babel添加GBK支持

5. IO多路复用

iolib提供了multiplex机制,原理也是对epoll/poll/kqueue进行了封装,我在linux下测试默认是用的epoll,使用方法和libevent非常相似,首先要创建一个全局的event base:

(setf *http-event-base*
        (make-instance 'iomux:event-base))
</lisp>
 
将要进行利用的socket对象添加到该event base中,使用set-io-handler函数:
<pre lang="lisp">
(set-io-handler *http-event-base*
                (socket-os-fd socket)
                :read
                (make-http-event-loop conn client)
                :one-shot t)

第三个选项:read表示监听套接字是否有数据可读,同类的选项还有:write和:error,第四个参数是事件发生是要执行的回调函数,由于lisp中没有类似于C语言中的void*这种方式,不会像C语言一样给回调函数通过一个指针来传递相关的参数,但lisp的高阶函数使用传递额外参数更加方便了,上述代码中的make-http-event-loop函数的返回值是一个lambda函数,用来作为set-io-handler的回调函数,而 conn和clinet两个参数可以通过make-http-event-loop传递给lambda函数:

(defun make-http-event-loop (conn client)
  (lambda (fd event exception)
     (format t "event ~A on fd(~D) with connection
:~A client :~A" event fd conn client)))

最后,调用event-dispatch函数来进入事件循环:

(event-dispatch *http-event-base*)
(when *http-event-base*
  (close *http-event-base*))

6. iolib的其它参考文档

分类: Lisp 标签: , ,

Redis的事件循环与定时器模型

十月 7th, {2011 1 条评论 16,592 人阅读过  

假期的最后一天,简单翻阅了下Redis的源码,读一款server软件的源码我一般是从进程/线程模型开始的,Redis让我有些诧异,它采用了单进程单线程的模型,一般的server软件都会采用多进程或者多线程再或者多线程多进程混合的模型来设计,从而充分利用多核处理器的并行计算能力来提高软件的性能,Redis这种模型我只能推断程序的可并行化程度不高,顺序计算反而能省去多线程同步和维护线程池/进程池的开销,我对于数据库server端的设计没有什么经验也没有太多的理解,如有谬误欢迎大家指正。

当然,这里要写的不是关于Redis的进程模型,而是Redis的事件模型和定时器模型。

Redis没有依赖libevent,而是自己通过IO多路复用的方式来实现了事件循环和定时器,不像nginx或者apache有多种多路复用方式可供选择,Redis只采用了三种:epoll/kqueue/select,默认采用epoll,在linux环境下最优的方式当然是epoll,当在FreeBSD平台下epoll不存在时则使用kqueue,当然若两种方式都未定义则使用性能最差的select,我只阅读了跟epoll相关的代码。

main()函数的最后调用了aeMain()这个函数进入Redis的事件循环,这个函数的很简单,循环调用aeProcessEvents()来对事件进行处理:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

在此之前Redis做了很多初始化的工作,这些工作大多是在initServer()这个函数中执行的,初始化一些相关的list,dict等,调用aeCreateEventLoop()初始化eventloop,这个函数初始化eventloop相关的数据结构,并最终调用了epoll_create()函数,对epoll上下文进行初始化。紧接着Redis创建了用于listen的socket对象,并调用aeCreateFileEvent()把该socket描述符的读事件加入到事件池中去,另外,还调用了aeCreateTimeEvent()函数来初始化一下定时器,定期地执行serverCron()这个函数,接下来看一下aeCreateFileEvent()aeCreateTimeEvent()这两个函数。

aeCreateFileEvent()这个函数初始化aeFileEvent结构(该结构保存事件的一些状态,以及事件的文件描述符等),并调用aeApiAddEvent()函数将描述符相关的事件添加到事件池中,对于epoll它的实现如下:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
 
    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /* avoid valgrind warning */
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

非常简洁,这个函数只不过是把epoll_ctl()相关的操作做了一下封装,至此描述符已经加入到事件池中进行监听了,接着看aeCreateTimeEvent()这个函数。

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;
 
    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->next = eventLoop->timeEventHead;
    eventLoop->timeEventHead = te;
    return id;
}

同样是初始化数据结构,但没有调用aeApiAddEvent()这个函数,当然,定时器又不需要文件描述符,当然不需要添加相关事件,定时器的实现只是使用了epoll_wait()的定时功能,aeAddMillisecondsToNow()这个函数顾名思义是把当前时间加上一个给定的毫秒数,然后算出一个when_sec和when_ms,eventloop对象的timeEventHead实际上是一个单向链表,它用于保存所有的定时器事件,当添加一个定时器事件时其实只是向该链表头中插入了一个元素,其会后由aeProcessEvents()这个函数遍历该链表取出超时的事件进行处理,接着我们看下这个事件处理里面最核心的函数。

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
 
    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
 
    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
 
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;
 
            /* Calculate the time missing for the nearest
             * timer to fire. */
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to se the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }
 
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
 
	    /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
 
    return processed; /* return the number of processed file/time events */
}

这其中的aeApiPoll()这个函数其实就是对epoll_wait()操作的一个封装,epoll_wait()的最后一个参数是一个毫秒级的超时时间,Redis充分利用了这个时间在对IO事件进行监听的同时来实现了定时。这个函数的前面一大部分代码是在计算这个超时时间的,它调用aeSearchNearestTimer()这个函数来获取最近要超时的一个定时器对象,如何获取的呢?就是遍历刚才的提到的那个timeEventHead链表,来找出时间值最小的一个,注意是遍历,因为链表中的定时器也是无序的,不过我相信作者有一天会把它换成红黑树或者其它的数据结构吧。如果找到一个将要超时的定时器,则将它与当前时间进行比较,如果当前时间大于定时器时间则表示定时器已超时,将超时时间设为0,若当前时间小于定时器时间,则将超时时间设为两者之差。如果定时器队列为空,或者说没有任何定时器事件,则可以根据AE_DONT_WAIT这个标志来决定epoll_wait()是non-blocking立即返回,还是一直阻塞在那里。

aeApiPoll()函数返回时,有两种情况,一种是IO事件被触发,另一种是定时器超时,当IO事件被触发时,遍历所有活跃描述符并调用相关的回调函数对其进行处理。当没有IO事件被触发,而是超时时,则返回值numevents为0,函数会转向processTimeEvents()来遍历定时器列表,调用定时器回调函数处理定时器事件,当IO事件被触发而并没有定时器超时时,如果设置了AE_TIME_EVENTS标志则也会对定时器列表进行遍历,主循环便是如此,我认为这会多少对效率有一定的影响,当然可能现在的Redis定时器列表并不太大,所以效率问题也可以忽略。

以上是简单地对今天的工作做的总结,欢迎大家批评指教。

分类: NoSql 标签: , ,