并发模型,Memcached网络模型

swoole的完整数据流程图能够看官方文书档案swoole运营流程

memcached是一个出色的单进度系统。尽管是单进度,不过memcached内部通过四线程完成了master-worker模型,那也是服务端最广大的意气风发种并发模型。实际上,除了master线程和worker线程之外,memcached还大概有部分任何的支持线程(举个例子logger线程卡塔尔(قطر‎,不过与本文宗旨无关,所以那边不做描述。

前边用libevent开垦了一个流媒体服务器。用线程池完结的。之后又看了memcached的互连网有关贯彻,明日来收拾一下memcached的贯彻流程。

在主进度中有八个入线人程 主线程和reactor线程, 主线程用来管理 accept()事件,创建新的socket fd, 接纳到新连接后会把新的socket
连接放到reactor线程的的风浪监听循环中,
reactor线程负担采纳从client端发过来的数量,按左券深入分析后,通过pipe管道传输给worker进程处理,worker进程管理完后,把结果通过pipe回传给reactor线程,reactor线程再按合同把结果通过socket发送给client。
能够看看reactor线程负担数据的IO和传导,在linux景况下,那个IO事件经过epoll机制来拍卖。

master-worker线程模型

memcached有1条主线程,甚至4条woker线程。能够由此运行参数-t来钦点worker线程的数额,假若不点名,私下认可情状下正是4。简单的话,主线程肩负监听央浼,分发给worker线程。而worker线程担当选拔具体的伸手命令况且作出管理。

暗示图如下:

美高梅娱乐开户 1

那张图大约画出了主题线程之间的关系。

主线程监听到有新的连年之后,会做出一遍选拔,我们称为dispatch,其实便是规定此一而再三翻五次后续会由哪些worker线程管理。生龙活虎旦鲜明worker线程,接下去主线程会依附管道文告该worker线程。在每条worker线程的中间,都有四个连续队列。worker线程收到公告之后,会从三翻五次队列中抽取叁个老是,用于后续选择现实的下令,以至做出响应。

OK,上面只是三个大约的描述,正确的细节,大家照旧得经过解析源代码才干摸清。

memcached不一样于Redis的单进度单线程,是使用多线程的做事办法。有三个主线程,同期爱惜了两个线程池(专门的职业线程)。worker
thread专门的学业线程和main
thread主线程之间首要通过pipe来张开通讯。因为用了libevent,所以认为比Redis稍稍庞大点,未有在生育条件比较过Redis和memcached,所以也倒霉说哪些性质比对。

下一步大家看下主线程和 reactor线程数据交流的经过的源码

worker线程的创制

来看一下worker线程的创制进度,在main函数中有:

/* start up worker threads if MT mode */
memcached_thread_init(settings.num_threads);

settings.num_threads调控着worker线程的个数。前文提到能够经过-t参数改造,因为在main函数刚开端分析运转参数的风姿浪漫段中有:

case 't':
    settings.num_threads = atoi(optarg);
    if (settings.num_threads <= 0) {
        fprintf(stderr, "Number of threads must be greater than 0\n");
        return 1;
    }
    /* There're other problems when you get above 64 threads.
     * In the future we should portably detect # of cores for the
     * default.
     */
    if (settings.num_threads > 64) {
        fprintf(stderr, "WARNING: Setting a high number of worker"
                        " threads is not recommended.\n"
                        " Set this value to the number of cores in"
                        " your machine or less.\n");
    }
    break;

能够见到,最棒不用设置当先64条线程,线程风流浪漫旦太多,频仍的切换也急需支出,其余就是memcached多量使用互斥锁,可能会使得尚未抢到锁的线程处于等候情状。可是笔者并从未测验过线程数量依次增加与性格的损失比例。

接下去看memcached_thread_init函数:

void memcached_thread_init(int nthreads) {
    // 锁的初始化
    ...

    // 初始化init_lock,init_cond
    pthread_mutex_init(&init_lock, NULL);
    pthread_cond_init(&init_cond, NULL);

    // 锁的初始化
    ...

    // 分配LIBEVENT_THREAD的空间
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
    if (! threads) {
        perror("Can't allocate thread descriptors");
        exit(1);
    }

    // 初始化各线程
    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {                                                 // 这里创建了管道!!!
            perror("Can't create notify pipe");
            exit(1);
        }

        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];

        setup_thread(&threads[i]);
        /* Reserve three fds for the libevent base, and two for the pipe */
        stats_state.reserved_fds += 5;
    }

    // 启动各线程
    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }

    // 等待各条线程初始化完成之后,再继续执行
    pthread_mutex_lock(&init_lock);
    wait_for_thread_registration(nthreads);
    pthread_mutex_unlock(&init_lock);
}

memcached_thread_init的刚初始,是起首化一几种的锁。关于memcached中锁的选取,能够独自开生龙活虎章来汇报,源码中大约四处存在锁。那Ritter别列出了init_lock和init_cond,因为在memcached_thread_init的最后会涉及到。

主线程和行事线程皆有三个event base,概况框架如下图:

第一步要找到 swServer_start(swServer *serv卡塔尔 函数,
这些是swoole运维server的输入,在此个函数中非常轻易就会读到这般一句

LIBEVENT_THREAD

开端化各样锁之后,会为4条worker线程分配内部存款和储蓄器空间。大家能够看出,线程相关的数码被封装在LIBEVENT_THREAD结构体此中。

typedef struct {
    pthread_t thread_id;                // 线程ID
    struct event_base *base;            // 一个libevent的实例
    struct event notify_event;          // 监听的事件
    int notify_receive_fd;              // 管道的recv端
    int notify_send_fd;                 // 管道的send端
    struct thread_stats stats;          // 线程的一些状态统计
    struct conn_queue *new_conn_queue;  // 连接队列
    cache_t *suffix_cache;              /* suffix cache */
    logger *l;                          // logger
} LIBEVENT_THREAD;

在为LIBEVENT_THREAD分配好内部存款和储蓄器之后,紧接着就初叶在for循环中逐一早先化各条线程。

这里首先做的事体,是独立自主主线程和worker线程之间的管道。

// 建立管道
int fds[2];
if (pipe(fds)) {
    perror("Can't create notify pipe");
    exit(1);
}

// 设置管道两端的fd
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];

                                                    
美高梅娱乐开户 2

int swServer_start(swServer *serv) { ...... ret = swServer_start_proxy; //启动主线程和reactor线程,并注册各种事件处理函数,和协议解析 ......}

setup_thread

前文提到了,当marster选用新连接之后,会使用管道布告worker。大家继续看setup_thread函数:

static void setup_thread(LIBEVENT_THREAD *me) {

    // 初始化worker线程的libevent实例
    me->base = event_init();
    if (! me->base) {
        fprintf(stderr, "Can't allocate event base\n");
        exit(1);
    }

    // 监听管道的recv端,一旦获取到marster线程的通知,会触发thread_libevent_process函数
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);

    if (event_add(&me->notify_event, 0) == -1) {
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
        exit(1);
    }

    // 初始化worker线程的conn_queue,即连接队列
    me->new_conn_queue = malloc(sizeof(struct conn_queue));
    if (me->new_conn_queue == NULL) {
        perror("Failed to allocate memory for connection queue");
        exit(EXIT_FAILURE);
    }
    cq_init(me->new_conn_queue);

    if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
        perror("Failed to initialize mutex");
        exit(EXIT_FAILURE);
    }

    // 初始化worker线程的suffix_cache
    me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
                                    NULL, NULL);
    if (me->suffix_cache == NULL) {
        fprintf(stderr, "Failed to create suffix cache\n");
        exit(EXIT_FAILURE);
    }
}

至此,LIBEVENT_THREAD中,一些字段起头化工作皆是预备安妥。

全体框架图:

上面重视讲 swServer_start_proxy

create_worker

紧接着看第一个for循环中的create_worker函数,正是运用pthread_create,真正的开创线程。

static void create_worker(void *(*func)(void *), void *arg) {
    pthread_attr_t  attr;
    int             ret;

    pthread_attr_init(&attr);

    // 利用ptread_create创建线程,线程执行函数为worker_libevent
    if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
        fprintf(stderr, "Can't create thread: %s\n",
                strerror(ret));
        exit(1);
    }
}

值得蓬蓬勃勃提的是,线程被创立时钦点的奉行函数为worker_libevent。那象征,4条worker线程被pthread真正成立之后,会进去到worker_libevent。来看看worker_美高梅娱乐开户,libevent:

static void *worker_libevent(void *arg) {
    LIBEVENT_THREAD *me = arg;

    // 初始化logger
    me->l = logger_create();
    if (me->l == NULL) {
        abort();
    }

    // 利用条件变量与主线程同步
    register_thread_initialized();

    // 一旦执行下面一句,worker线程就开始挂起,直到接收主线程通知
    event_base_loop(me->base, 0);
    return NULL;
}

其中的register_thread_initialized要与memcached_thread_init函数最终三句结合一同来看:

pthread_mutex_lock(&init_lock);
wait_for_thread_registration(nthreads);
pthread_mutex_unlock(&init_lock);

那是运用规范变量进行线程同步的一个很精髓的做法。

wait_for_thread_registration表示,主线程挂起在尺度变量上,只要init_count<4则主线程向来会挂起。而子线程的register_thread_initialized表示,子线程风流罗曼蒂克旦init实现,就将init_count++,并且告诉主线程,主线程随后继续推断init_count是否<4。最后,唯有当4条worker线程都init结束之后,主线程才会停止挂起,继续向下实施。

到现在,worker线程初叶化的连锁源码都曾经阅读完了。memcached的源码相对别的部分开源项目,其实照旧很清晰明了的。

                             
美高梅娱乐开户 3

//src/network/Server.cstatic int swServer_start_proxy(swServer *serv){ int ret; swReactor *main_reactor = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swReactor)); ret = swReactor_create(main_reactor, SW_REACTOR_MAXEVENTS); // 创建主线程reactor对象事件处理,只是创建对象,主线程不用pthread_create() 真正去创建线程 //...... // 省略区 ret = swReactorThread_start(serv, main_reactor); //把server监听端口的listen事件加入到主线程事件监听,创建reactor线程 //...... // 省略区 main_reactor->setHandle(main_reactor, SW_FD_LISTEN, swServer_master_onAccept); //主线程reactor listen事件处理handle //...... // 省略区 return main_reactor->wait(main_reactor, NULL); //主线程循环}int swReactorThread_start(swServer *serv, swReactor *main_reactor_ptr){//..... 省略区 swListenPort *ls; LL_FOREACH(serv->listen_list, ls) { if (ls->type == SW_SOCK_UDP || ls->type == SW_SOCK_UDP6 || ls->type == SW_SOCK_UNIX_DGRAM) { continue; } main_reactor_ptr->add(main_reactor_ptr, ls->sock, SW_FD_LISTEN); //把监听的端口fd加入到 主线程事件监听循环中 }//..... 省略区 return SW_OK;}int swServer_master_onAccept(swReactor *reactor, swEvent *event){ //..... 省略区 //SW_ACCEPT_AGAIN for (i = 0; i < SW_ACCEPT_MAX_COUNT; i++) { new_fd = accept(event->fd, (struct sockaddr *) &client_addr, &client_addrlen); // accept新socket //..... 省略区 if (sub_reactor->add(sub_reactor, new_fd, SW_FD_TCP | SW_EVENT_WRITE) < 0) //新socket加入到 reactor线程事件监听中 { bzero(conn, sizeof(swConnection)); close; return SW_OK; } } return SW_OK;}

marster线程的监听

主线程最初化完worker线程之后,会特别管理互联网连接。通过代码能够看见,memcached可以支撑二种合同:uds(unix
domain socket),tcp,udp。

// 如果指定了uds路径,则创建unix domain socket
if (settings.socketpath != NULL) {
    errno = 0;
    if (server_socket_unix(settings.socketpath, settings.access)) {
        vperror("failed to listen on UNIX socket: %s", settings.socketpath);
        exit(EX_OSERR);
    }
}

// 否则,创建tcp和udp的socket
if (settings.socketpath == NULL) {
    ...

    // tcp
    errno = 0;
    if (settings.port && server_sockets(settings.port, tcp_transport,
                                        portnumber_file)) {
        vperror("failed to listen on TCP port %d", settings.port);
        exit(EX_OSERR);
    }

    // udp
    errno = 0;
    if (settings.udpport && server_sockets(settings.udpport, udp_transport,
                                          portnumber_file)) {
        vperror("failed to listen on UDP port %d", settings.udpport);
        exit(EX_OSERR);
    }

    ...
}

memcached的启航参数-p和-U分别调控了setting.port和setting.udpport。

比方来讲,假如想只帮助tcp左券的总是,监听端口号为8888,不扶植udp连接,大家能够按如下方式运营:

memcached -p8888 -U0

OK,接下去正是现实来看socket的拍卖了。

线程模型:

  1. swReactor_create(卡塔尔(قطر‎ ,
    创制主线程reactor对象,只是创设对象,主线程不用pthread_create(State of Qatar真正去制造线程,主线程就在主进度中
  2. swReactorThread_start(卡塔尔(قطر‎ , 那个中主要分为两件事,
    三个是把server监听端口的listen事件参加到主线程事件监听中去,那样假设有连接到达监听端口就能调用主线程的
    swServer_master_onAccept(State of Qatar来拍卖新连接。
    其余风华正茂件事正是成立reactor线程并注册各样风云监听和商业事务管理3.main_reactor->setHandle(卡塔尔(قطر‎, 那么些注册主线程reactor listen事件管理handle ,
    也正是登记swServer_master_onAccept()4.main_reactor->wait(main_reactor,
    NULL卡塔尔(قطر‎;
    这几个是主线程循环,主线程一向在此个函数中循环,等待新连接,然后把新连接通过swServer_master_onAccept(卡塔尔,拿到新的socket, 再把新的socket , 通过
    sub_reactor->add(sub_reactor, new_fd, SW_FD_TCP |
    SW_EVENT_WPAJEROITE卡塔尔(قطر‎ 参与到reactor线程的事件管理循环中

server_socket

先画二个轻易易行的调用关系图:

美高梅娱乐开户 4

server_sockets函数微微有些复杂,不过其本质上是调用server_socket函数来创立一个个socket。在server_socket中,具体做到了初步化socket,绑定IP和端口,增加监听的事件等专门的工作。

咱们向来来看server_socket:

static int server_socket(const char *interface,
                         int port,
                         enum network_transport transport,
                         FILE *portnumber_file) {
    int sfd;
    struct linger ling = {0, 0};
    struct addrinfo *ai;
    struct addrinfo *next;
    struct addrinfo hints = { .ai_flags = AI_PASSIVE,
                              .ai_family = AF_UNSPEC };
    char port_buf[NI_MAXSERV];
    int error;
    int success = 0;
    int flags = 1;

    // 区分udp还是tcp
    hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;

    if (port == -1) {
        port = 0;
    }
    snprintf(port_buf, sizeof(port_buf), "%d", port);

    // 调用getaddrinfo来返回ai链表
    error = getaddrinfo(interface, port_buf, &hints, &ai);
    if (error != 0) {
        if (error != EAI_SYSTEM)
            fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
        else
            perror("getaddrinfo()");
        return 1;
    }

    // 遍历ai链表,针对每个地址创建socket
    for (next= ai; next; next= next->ai_next) {
        conn *listen_conn_add;

        // 调用socket()接口来实际创建套接字,并且将套接字设置成“非阻塞”
        if ((sfd = new_socket(next)) == -1) {
            /* getaddrinfo can return "junk" addresses,
             * we make sure at least one works before erroring.
             */
            if (errno == EMFILE) {
                /* ...unless we're out of fds */
                perror("server_socket");
                exit(EX_OSERR);
            }
            continue;
        }

        ...

        // 设置套接字的一些其他选项
        setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
        if (IS_UDP(transport)) {
            maximize_sndbuf(sfd);
        } else {
            error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
            if (error != 0)
                perror("setsockopt");

            error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
            if (error != 0)
                perror("setsockopt");

            error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
            if (error != 0)
                perror("setsockopt");
        }

        // 调用bind
        if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
            if (errno != EADDRINUSE) {
                perror("bind()");
                close(sfd);
                freeaddrinfo(ai);
                return 1;
            }
            close(sfd);
            continue;
        } else {
            success++;

            // 调用listen
            if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
                perror("listen()");
                close(sfd);
                freeaddrinfo(ai);
                return 1;
            }
            ...
        }

        if (IS_UDP(transport)) {
            ...
        } else {
            // 初始化conn,设置对sfd的监听事件
            if (!(listen_conn_add = conn_new(sfd, conn_listening,
                                             EV_READ | EV_PERSIST, 1,
                                             transport, main_base))) {
                fprintf(stderr, "failed to create listening connection\n");
                exit(EXIT_FAILURE);
            }
            // 加入conn链表的头部
            listen_conn_add->next = listen_conn;
            listen_conn = listen_conn_add;
        }
    }

    freeaddrinfo(ai);

    /* Return zero iff we detected no errors in starting up connections */
    return success == 0;
}

这段代码即便相比较长,但实则并不复杂。追根究底,依旧是咱们耳濡目染的服务端编制程序,socket,bind,listen…等等。个中很主要的少数是利用conn_new函数来创建conn,以致安装监听事件。conn在memcached中是一个很要紧的数据布局,后文种具体解析。来看下设置监听事件:

event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;

if (event_add(&c->event, 0) == -1) {
    perror("event_add");
    return NULL;
}

这段代码能够看看,大器晚成旦前方socket成立的fd有监听到央求,则会触发调用event_handler函数。event_handler是五个那么些大旨的函数,它里面维护了三个状态机,依据央浼的景观分别开展区别的管理。event_handler的切实可行贯彻大家也置于后边再看。

当主线程完结server_sockets管理以往,在main函数中,会调用event_base_loop:

/* enter the event loop */
if (event_base_loop(main_base, 0) != 0) {
    retval = EXIT_FAILURE;
}

时至前天,主线程会步向所谓的event
loop,每当有client发起连接,主线程便会推行上述的event_handler。

                                       
美高梅娱乐开户 5

同过上边的几部能够领略的看见主线程是怎么着监听listen事件,又是哪些accept(State of Qatar新连接,
并把新连接插手到reactor线程的监听中的

关于conns

前文提到在sever_socket中利用conn_new函数来创建conn连接。conn结构体在memcache中扮演着很重大的剧中人物。无论是主线程的监听,依旧子线程与各个client进行相互作用管理,用到的都以那一个conn。

master线程只绑定了三个event事件,client发起连接时被触发。对应的event_handler函数真正去管理的,就是流传的conn对象。

种种woker线程会绑定多少个event事件,具体要视二个worker线程管理多少client的拜见有关。然则各样worker线程,唯有三个平地风波是与master线程有关的。那就是master选拔到client发起的接连之后,会选取pipe布告贰个worker,这几个历程大家称为“分发”。woker线程收到公告后,触发该事件,对应的函数为thread_libevent_process(参考setup_thread一小节)函数。thread_libevent_process中对于新的连接到来,也会立即用conn_new函数生成一个conn。至此,那些client与woker的保有人机联作,都是选取该conn完结。

画多少个大约的暗暗表示图:

A,有新的连年

美高梅娱乐开户 6

 

B,分发给worker1之后

美高梅娱乐开户 7

 

C,又有新的接连

美高梅娱乐开户 8

 

D,分发给worker2之后

美高梅娱乐开户 9

 

E,创设了若干连接之后

美高梅娱乐开户 10

 

各类线程包蕴主线程都各自有独立的Libevent实例,Memcached的listen
fd注册到主线程的Libevent实例上,由主线程来accept新的连天,选择新的连天后根据Round-robin算法采用专门的学业线程,将新连接的socket
fd封装为CQ_ITEM后push到所选工作线程的CQ队列中,然后主线程(notify_send_fd)通过管道发送字符“c”到办事线程(notify_receive_fd),而notify_receive_fd已经注册到办事线程的Libevent实例上了,这样专业线程就能够接到通告“c”,然后从该职业线程的CQ队列中pop出CQ_ITEM进而抽取新连接并将fd注册到办事线程的Libevent实例上,进而由专门的学业线程来拍卖该连接的有所继续事件

上边继续说reactor线程

代码层达成图:

reactor线程通过 swReactorThread_start(卡塔尔 , 调用成立

         
美高梅娱乐开户 11

//src/network/ReactorThread.c 函数中间有省略,只粘贴了关键部分int swReactorThread_start(swServer *serv, swReactor *main_reactor_ptr){ //..... 省略区 //create reactor thread for (i = 0; i < serv->reactor_num; i++) { thread = &(serv->reactor_threads[i]); param = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swThreadParam)); if (param == NULL) { swError("malloc failed"); return SW_ERR; } param->object = serv; param->pti = i; if (pthread_create(&pidt, NULL, (void *  swReactorThread_loop,  param) < 0) { swError("pthread_create[tcp_reactor] failed. Error: %s[%d]", strerror, errno); } thread->thread_id = pidt; } return SW_OK;}

                            
美高梅娱乐开户 12

pthread_create 创设reactor线程,
线程的循环函数是swReactorThread_loop

 

//src/network/ReactorThread.c 函数中间有省略,只粘贴了关键部分static int swReactorThread_loop(swThreadParam *param){ //创建线程reactor对象 swReactor *reactor = &thread->reactor; ret = swReactor_create(reactor, SW_REACTOR_MAXEVENTS); //注册pipe管道的事件处理handle reactor->setHandle(reactor, SW_FD_CLOSE, swReactorThread_onClose); reactor->setHandle(reactor, SW_FD_PIPE | SW_EVENT_READ, swReactorThread_onPipeReceive); reactor->setHandle(reactor, SW_FD_PIPE | SW_EVENT_WRITE, swReactorThread_onPipeWrite); //set protocol function point 添加socket read write事件监听, 并注册协议处理函数 swReactorThread_set_protocol(serv, reactor); // reactor线程循环 reactor->wait(reactor, NULL); //shutdown reactor->free; return SW_OK;}void swReactorThread_set_protocol(swServer *serv, swReactor *reactor){ //UDP Packet reactor->setHandle(reactor, SW_FD_UDP, swReactorThread_onPackage); //Write socket 可写事件处理handle reactor->setHandle(reactor, SW_FD_TCP | SW_EVENT_WRITE, swReactorThread_onWrite); //Read socket 可读事件处理handle reactor->setHandle(reactor, SW_FD_TCP | SW_EVENT_READ, swReactorThread_onRead); swListenPort *ls; //listen the all tcp port LL_FOREACH(serv->listen_list, ls) { if (swSocket_is_dgram(ls->type)) { continue; } swPort_set_protocol; //协议处理解析, 比如http、http2、websocket、mqtt等 }}

源码解读:

swPort_set_protocol 左券管理函数

1. 器重数据构造

//src/network/Port.cvoid swPort_set_protocol(swListenPort *ls){ //Thread mode must copy the data. //will free after onFinish if (ls->open_eof_check) { if (ls->protocol.package_eof_len > sizeof(ls->protocol.package_eof)) { ls->protocol.package_eof_len = sizeof(ls->protocol.package_eof); } ls->protocol.onPackage = swReactorThread_dispatch; ls->onRead = swPort_onRead_check_eof; } else if (ls->open_length_check) { if (ls->protocol.package_length_type != '\0') { ls->protocol.get_package_length = swProtocol_get_package_length; } ls->protocol.onPackage = swReactorThread_dispatch; ls->onRead = swPort_onRead_check_length; } else if (ls->open_http_protocol) { if (ls->open_websocket_protocol) { ls->protocol.get_package_length = swWebSocket_get_package_length; ls->protocol.onPackage = swWebSocket_dispatch_frame; ls->protocol.package_length_size = SW_WEBSOCKET_HEADER_LEN + SW_WEBSOCKET_MASK_LEN + sizeof; }#ifdef SW_USE_HTTP2 else if (ls->open_http2_protocol) { ls->protocol.get_package_length = swHttp2_get_frame_length; ls->protocol.package_length_size = SW_HTTP2_FRAME_HEADER_SIZE; ls->protocol.onPackage = swReactorThread_dispatch; }#endif ls->onRead = swPort_onRead_http; //**http协议处理** } else if (ls->open_mqtt_protocol) { ls->protocol.get_package_length = swMqtt_get_package_length; ls->protocol.onPackage = swReactorThread_dispatch; ls->onRead = swPort_onRead_check_length; } else if (ls->open_redis_protocol) { ls->protocol.onPackage = swReactorThread_dispatch; ls->onRead = swPort_onRead_redis; } else { ls->onRead = swPort_onRead_raw; }}

/* An item in the connection queue.
*/主要用来存款和储蓄客商socket连接的骨干消息

主线程会将客商的socket连接音讯封装成CQ_ITEM,并归入职业线程的拍卖队列中。职业线程获得主线程的pipe布告后,就能够将队列中的ITEM抽出来,创设libevent的socket读事件。
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
    int               sfd;//socket fd
    enum conn_states  init_state;//事件类型
    int               event_flags;//libevent的flag
    int               read_buffer_size;//读取buffer的大小
    enum network_transport     transport;
    CQ_ITEM          *next;//下一个CQ_ITEM
};

/**
* The structure representing a connection into memcached.
*/
typedef struct conn conn;
struct conn {
    int    sfd;
    struct event event;
    short  ev_flags;
    short  which;   /** which events were just triggered */

    LIBEVENT_THREAD *thread; /* Pointer to the thread object
serving this connection */
};

/* A connection queue. */
typedef struct conn_queue CQ;
struct conn_queue {
    CQ_ITEM *head;//指向队列的第二个节点
    CQ_ITEM *tail;//指向队列的末段二个节点
    pthread_mutex_t lock;//叁个体系就相应三个锁
};

 

typedef struct {
    pthread_t thread_id;        /* unique ID of this thread
*/线程ID
    struct event_base *base;    /* libevent handle this thread uses
*/线程所接受的event_base
    struct event notify_event;  /* listen event for notify pipe
*/用于监听管道读事件的event
    int notify_receive_fd;      /* receiving end of notify pipe
*/管道的读端fd
    int notify_send_fd;         /* sending end of notify pipe
*/管道的写端fd
    struct thread_stats stats;  /* Stats generated by this thread
*/
    struct conn_queue *new_conn_queue; /* queue of new
connections to handle */
    cache_t *suffix_cache;      /* suffix cache */
} LIBEVENT_THREAD;

2. CQ队列暗指图

发表评论

电子邮件地址不会被公开。 必填项已用*标注