首页 > NoSql > Memcached的线程模型及状态机

Memcached的线程模型及状态机

2011年9月5日 发表评论 阅读评论 13,912 人阅读过  

Memcached是一种应用较广泛的分布式内存对象缓存系统,应用之余总想了解它的实现机理,这也就是开源的好处,以至于每接触一款优秀的开源软件都有去阅读它源代码的冲动,Memcached-1.4.7的代码量还是可以接受的,只有10K行左右,我比较关心的两个方面还是它的进程(线程)管理机制和内存管理机制,这里先简单写一下我对Memcached进程管理方面的理解。

Memcached使用libevent实现事件循环,libevent在Linux环境下默认采用epoll作为IO多路复用方法,这个不重要,接下来要讨论的是Memcached的进程管理模型。

Memcached采用了很典型的Master-Worker模型,采用的是多线程而不是多进程,而线程之间没有冗余的共享数据,这样便降低了多线程进行线程同步的开销,核心的共享数据是消息队列,主线程会把收到的事件请求放入队列,随后调度程序会选择一个空闲的Worker线程来从队列中取出事件请求进行处理。

在main()函数里面,Memcached为主线程调用event_init()创建了一个libevent base对象,随后调用thread_init()来初始化线程,我们来看下这个函数的实现。

void thread_init(int nthreads, struct event_base *main_base) {
    int         i;
 
    pthread_mutex_init(&cache_lock, NULL);
    pthread_mutex_init(&stats_lock, NULL);
 
    pthread_mutex_init(&init_lock, NULL);
    pthread_cond_init(&init_cond, NULL);
 
    pthread_mutex_init(&cqi_freelist_lock, NULL);
    cqi_freelist = NULL;
 
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
    if (! threads) {
        perror("Can't allocate thread descriptors");
        exit(1);
    }
 
    dispatcher_thread.base = main_base;
    dispatcher_thread.thread_id = pthread_self();
 
    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {
            perror("Can't create notify pipe");
            exit(1);
        }
 
        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];
 
        setup_thread(&threads[i]);
    }
 
    /* Create threads after we've done all the libevent setup. */
    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }
 
    /* Wait for all the threads to set themselves up before returning. */
    pthread_mutex_lock(&init_lock);
    while (init_count < nthreads) {
        pthread_cond_wait(&init_cond, &init_lock);
    }
    pthread_mutex_unlock(&init_lock);
}

代码很简单,初始化几个全局锁和一个条件变量,init_lock和init_cond这一对锁/条件变量的作用很明显,它们的作用是用来等待所有的worker线程启动完毕后,thread_init()才可以继续执行,见该函数最后的几句,非常通用的用法,等待init_count数达到预定的线程数后主线程方可继续执行,否则就一直在wait,每创建一个worker线程就会让init_count的值加1,创建worker线程的工作是在setup_thread()函数中进行的,这个后面讨论。

thread_init()在初始化完全局锁并为线程池分配分配空间之后便开始对线程池中的每个线程进行初始化,哦,在这之前把全局的调度线程设置成为主线程,OK,接下来就开始遍历线程池,每个线程都有一对notify_fd,它们通过管道连接,这个管道便是主线程对线程池中的工作线程进行调度的接口,看一下setup_thread()这个函数的实现(节省版面,只保留了关键代码)。

static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init();
    if (! me->base) {
        fprintf(stderr, "Can't allocate event base\n");
        exit(1);
    }
 
    /* Listen for notifications from other threads */
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);
 
    if (event_add(&me->notify_event, 0) == -1) {
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
        exit(1);
    }
 
    me->new_conn_queue = malloc(sizeof(struct conn_queue));
    if (me->new_conn_queue == NULL) {
        perror("Failed to allocate memory for connection queue");
        exit(EXIT_FAILURE);
    }
    cq_init(me->new_conn_queue);
}

首先调用event_init()为该线程初始化一个事件池,我们知道libevent的事件池(event base)不是线程安全的,所以每个线程需要有自己的事件池,所有的IO事件都由这个事件池来处理,在这个函数里面,首先将notify_receive_fd添加到事件监听循环中,设置回调函数thread_libevent_process(),当主线程向该worker线程通过pipe IPC发送消息时,便会触发该worker线程执行thread_libevent_process()函数。随后setup_thread()函数创建connection队列并将其初始化。

接着回过头看thread_init()函数,初始化线程池结束以后,所需要做的工作便是为线程池创建对应的线程,也就是调用了create_worker()函数,这个函数只不过把pthread_create()做了一个简单的封装,至少到目前为止里面没有什么特别的代码,线程的执行函数是worker_libevent(),该函数将全局线程数加一,随后便调用event_base_loop(),完全将事件循环交给了libevent,当然到目前为止libevent的事件池中只有用来进行IPC的管道文件描述符(说到这里,当时我还在奇怪呢,代码执行这里还并没有添加socket描述符到事件池里面,那event_loop岂不一执行就结束了,后来才反应过来里面有IPC管道文件描述符了),那么这些描述符是在什么时候添加的呢?刚才已经讨论过了,是在setup_thread()函数里面,它的事件回调函数是thread_libevent_process(),OK,接下来我们看看这个函数吧。

static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];
 
    if (read(fd, buf, 1) != 1)
        if (settings.verbose > 0)
            fprintf(stderr, "Can't read from libevent pipe\n");
 
    item = cq_pop(me->new_conn_queue);
 
    if (NULL != item) {
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);
        if (c == NULL) {
            close(item->sfd);
        } else {
            c->thread = me;
        }
        cqi_free(item);
    }
}

函数的一开始便从fd中读取一个字节,不在乎这一个字节是什么,只是主线程发过来的一个单字节(实际上是个\0),用来触发fd的READ事件,主线程通过往worker线程的notify_write_fd中写入一个单字节来实现对worker线程的调度,因此这个函数里面首先要先把这一个字节读取出来,否则会影响以后的事件循环。紧接着从connection队列中弹出一个item,然后根据这个item的信息再创建一个connection,这里的conn还是不要理解为连接的好,理解成任务更好一些,conn对象中有读写缓冲区,状态(用来实现Memcached的有限状态机)等,这conn_new()这个函数里面会创建一个conn对象并对它进行初始化,但最重要的操作是设置对socketfd的事件监听函数,event_handler(),这个函数又调用drive_machine(),其中就实现了Memcached的有限状态机,通过对state的不同值来进行不同的操作,具体的状态如下:

const char* const statenames[] = { "conn_listening",
						      "conn_new_cmd",
						        "conn_waiting",
							"conn_read",
							"conn_parse_cmd",
							"conn_write",
							"conn_nread",
							"conn_swallow",
							"conn_closing",
							"conn_mwrite" };

关于状态机是怎么运行的就不讨论了,代码篇幅过长,涉及到的细节太多,不展开说了,要说的一点是状态机在哪里进入的conn_listening状态,这是某个socketfd的起始状态,socket需要先监听然后accept,再之后才可以进行read,write等操作,listen必然会在程序初始化的过程中调用,我们可以看到在main()函数里面线程初始化结束以后,便开始对socket描述符进行初始化,获取系统存在几个可用的地址信息,然后创建socket描述符,再然后bind(),再之后调用listen(),这些操作都是在server_socket()函数中执行的,该函数初始化监听socket,并针对服务器做一些socket options的设置,最后最关键的一步是针对该socket调用了conn_new(),但事件池是main_base,也就是说accept()事件是由主线程接收到的,accept()是发生在主线程内,这样也就避免了多线程accept()的惊群,accept()函数的执行也是在状态机循环中执行的,drive_machine()函数中,accept()完成后,主线程便调用dispatch_conn_new()对该描述符进行调度,状态机的代码太长,不列出来了,看一看dispatch_conn_machine()的代码吧。

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) {
    CQ_ITEM *item = cqi_new();
    int tid = (last_thread + 1) % settings.num_threads;
 
    LIBEVENT_THREAD *thread = threads + tid;
 
    last_thread = tid;
 
    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;
 
    cq_push(thread->new_conn_queue, item);
 
    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    if (write(thread->notify_send_fd, "", 1) != 1) {
        perror("Writing to thread notify pipe");
    }
}

创建一个item并压入队列,然后通过一个简单的哈希从线程池中找出一个线程,写一个空字符来激活这个线程(前面讨论过了),接着被激活的线程就会从conn_queue中弹出一个item来进行处理。

OK,关于Memcached的线程模型要说的就这些吧,我自己读源码时对它的浅显的理解,欢迎大家批评讨论。

原创文章,转载请注明: 转载自basic coder

本文链接地址: http://basiccoder.com/thread-model-and-state-machine-of-memcached.html

分类: NoSql 标签: , ,
  1. imouse
    2011年10月5日22:17 | #1

    嗯,不错的代码,涉及异步IO/线程池/线程同步/经典的master-slave thread的模式
    看过ngircd的代码,
    1. 它只有一个主线程,建立了一个全局的event表和一个IO library(类似libevent).
    即使是listen socket也会放到event事件表中。事件表中包含socket描述符和回调函数。
    2. 对每个产生的链接放入到IO library去epoll/select/poll/(宏定义隔开)中,当client发来irc命令后,立即能处理。

    看了这篇后,发现ngircd可以修改成线程池的方式,可能会高效些,前提是处理好同步问题。呵呵!

    • 2011年10月5日22:27 | #2

      单线程的好处是能省去同步的开销,而且比较好维护,但不能充分利用多核处理器的高并发,pidgin也是用的单线程,我觉得这种client软件这样还是挺好的。
      但server端还是尽可能用多线程或多进程的好,要高效嘛,memcached用消息队列尽量减少线程间的资源共享,线程之间使用IPC进行通信,这样就减少了线程同步的开销,我觉得这种方法还是挺不错的 :)

  2. imouse
    2011年10月6日11:45 | #3

    同意你的说法。尽量的多线程或多进程很适合web,游戏等服务器。
    在嵌入式开发中,更多业务逻辑是用单个任务来完成,任务内部逻辑上分解成不同的处理消息的回调函数,
    并且对不同的消息处理函数调用时按照给定优先级进行调度,这样可以让CPU能充分发挥
    本身的性能。当然,回调函数中时不允许延迟处理,延迟处理的事物会交给单独任务来处理。

    通过一段时间的观察测试,发现这样的模型在比以前用的多任务处理效率高20%。(以前的模型跟上面
    你讲的类似,通知线程换成了二进制信号量而已,也有队列。)

    这么好的代码,准备把它代码裁减改写一下,变的更通用些,收藏,啥时候要做个XX服务器,拿来填充一下业务处理层就好了。哈哈!

    • 2011年10月6日11:55 | #4

      嗯,我觉得Memcached的代码还是不错的,不管是编码风格还是代码结构都很好,而且现在只有2w多行,部分代码拿来复用也不错

  3. 刀刀
    2011年10月14日14:39 | #5

    非常感谢,很受益。有一个地方我不太明白,请教当conn_read读取完毕后,设置成conn_parse_cmd状态,然后这个线程如何被唤醒?再次感谢

  4. 2011年11月23日18:00 | #6

    博主怎么联系你啊,memcache的代码正在读,遇到些问题

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。