感谢支持
我们一直在努力

Memcached源码学习

大致浏览了一下memcached的源码,但是并没有对相关的知识点进行总结和记录,所以很快就忘了,这次打算将memcached的源码再学习一遍,并进行总结归纳。


memcached模块化设计比较好,每个模块除了对外接口定义在头文件外,其它函数定义及实现都在源文件中,且定义为static类型,这样很好的降低了模块之间的耦合性。下面,浏览源码将按照功能模块进行划分,逐步学习总结。


memcached主要包括以下模块(不完全归纳):


内存管理机制(slab),hash,多线程及libevent事件处理机制,…


本文主要对memcached的内存管理机制进行总结,并画出相应的结构图,便于理解。


众所周知,简单的使用malloc和free,这样将产生大量的内存碎片,从而加重操作系统内存管理器的负担。memcached的内存管理机制采用了slab allocator内存分配和管理机制,以解决内存碎片问题。slab allocator基本原理是按照预先定义的大小,将内存分割为多种特定长度的trunk块,并将长度相同的trunk块归成slab组,每次请求内存时,采用最佳适应算法查询并获得一个trunk,用于保存item。


memcached中slab内存分配管理相关函数定义及实现源码全部集中在slabs.h和slabs.c中,slabs.h定义了外部模块内存操作的接口,包括的函数如下(其中最后2个函数与slab内存管理机制关联不大,后续不予讨论):


// slabs_init:初始化slab内存管理,主要完成slabclass数组中每个slabclass_t中trunk大小(内存以CHUNK_ALIGN_BYTES=8字节对齐)及每个slab中trunk数量的初始化


// 参数 limit:运行时指定的memcached可用内存大小,0表示不限制大小


// 参数 factor:增长因子


// 参数 prealloc:表示是否预分配limit内存,true:则在函数内使用malloc预分配limit大小的内存


void slabs_init(const size_t limit, const double factor, const bool prealloc) ;


// slabs_clsid:返回size大小对应的slabclass索引clsid,即size大小的trunk将放入slabclass[clsid]中,0表示对象太大


unsigned int slabs_clsid(const size_t size) ;


// slabs_alloc:从slabclass[id]中分配一个size大小的trunk,错误时返回NULL(0)


void *slabs_alloc(const size_t size, unsigned int id) ;


// slabs_free:将ptr指向的大小为size的内存区域加入slabclass[id]的空闲内存块数组(freelist)中


void slabs_free(void *ptr, size_t size, unsigned int id) ;


// 调整slabclass[id]的requested值:requested = requested – old + ntotal


void slabs_adjust_mem_requested(unsigned int id, size_t old, size_t ntotal) ;


// 返回状态信息()


bool get_stats(const char *stat_type, int nkey, ADD_STAT add_stats, void *c) ;


slabs.c中定义了memcached中slab allocator实现代码,下面首先介绍使用的数据结构,然后介绍相关的实现。


•数据结构


memcached定义slabclass数组用来管理内存:


slabclass_t slabclass[MAX_NUMBER_OF_SLAB_CLASSES];


memcached的slab内存管理机制最主要的数据结构为struct slabclass_t,定义如下:



  1. typedef struct {  
  2.     unsigned int size;      /* sizes of items */  
  3.     unsigned int perslab;   /* how many items per slab */  
  4.   
  5.     void **slots;           /* list of item ptrs */  
  6.     unsigned int sl_total;  /* size of previous array */  
  7.     unsigned int sl_curr;   /* first free slot */  
  8.   
  9.     void *end_page_ptr;         /* pointer to next free item at end of page, or 0 */  
  10.     unsigned int end_page_free; /* number of items remaining at end of last alloced page */  
  11.   
  12.     unsigned int slabs;     /* how many slabs were allocated for this class */  
  13.   
  14.     void **slab_list;       /* array of slab pointers */  
  15.     unsigned int list_size; /* size of prev array */  
  16.   
  17.     unsigned int killing;  /* index+1 of dying slab, or zero if none */  
  18.     size_t requested; /* The number of requested bytes */  
  19. } slabclass_t;  

其中,size为slabclass_t中每个trunk的大小,perslab为每个slab包含的trunk数;


slots为memcached中空闲trunk块指针数组(或列表,以下使用数组),sl_total为已分配的slots数组大小,sl_curr为当前可用的slots数组索引;


slab_list为此slabclass_t中的slab指针数组,list_size为slab_list指针数组已分配的大小,slabs为当前已使用的slab_list指针数组数量,end_page_ptr和end_page_free分别为当前的slab中trunk的起始位置和trunk可用数量;


killing不确定,requested为已使用的内存大小。


memcached的slab数据结构如下图所示(图中实箭头表示指针,小箭头表示索引或数量):


•实现介绍(函数介绍过程中,结合上图理解起来更容易)



下面将对主要的代码进行解析:



  1. /* 
  2.  * Figures out which slab class (chunk size) is required to store an item of 
  3.  * a given size. 
  4.  * 
  5.  * Given object size, return id to use when allocating/freeing memory for object 
  6.  * 0 means error: can’t store such a large object 
  7.  */  
  8.   
  9. unsigned int slabs_clsid(const size_t size) {  
  10.     int res = POWER_SMALLEST;  
  11.   
  12.     if (size == 0)  
  13.         return 0;  
  14.     // 从后向前遍历slabclass数组,找到最适合放入size大小的slabclass_t的索引   
  15.     while (size > slabclass[res].size)  
  16.         if (res++ == power_largest)     /* won’t fit in the biggest slab */  
  17.             return 0;  
  18.     return res;  
  19. }  

 



  1. /** 
  2.  * Determines the chunk sizes and initializes the slab class descriptors 
  3.  * accordingly. 
  4.  */  
  5. void slabs_init(const size_t limit, const double factor, const bool prealloc) {  
  6.     int i = POWER_SMALLEST – 1;  
  7.     unsigned int size = sizeof(item) + settings.chunk_size;    // 初始化trunk大小   
  8.   
  9.     mem_limit = limit;  
  10.   
  11.     // 指定为预分配内存,则一次行分配全部内存(limit大小)   
  12.     if (prealloc) {  
  13.         /* Allocate everything in a big chunk with malloc */  
  14.         mem_base = malloc(mem_limit);  
  15.         if (mem_base != NULL) {  
  16.             mem_current = mem_base;  
  17.             mem_avail = mem_limit;  
  18.         } else {  
  19.             fprintf(stderr, “Warning: Failed to allocate requested memory in”  
  20.                     ” one large chunk.\nWill allocate in smaller chunks\n”);  
  21.         }  
  22.     }  
  23.   
  24.     memset(slabclass, 0, sizeof(slabclass));  
  25.     // 初始化每个slabclass_t的trunk大小和每个slab中trunk数量   
  26.     // slabclass中每个slabclass_t的trunk大小增长为factor倍   
  27.     // 注意 i 从索引 1 开始   
  28.     while (++i < POWER_LARGEST && size <= settings.item_size_max / factor) {  
  29.         /* Make sure items are always n-byte aligned */  
  30.         if (size % CHUNK_ALIGN_BYTES)                             // 内存8字节对齐   
  31.             size += CHUNK_ALIGN_BYTES – (size % CHUNK_ALIGN_BYTES);  
  32.   
  33.         slabclass[i].size = size;  
  34.         slabclass[i].perslab = settings.item_size_max / slabclass[i].size;  
  35.         size *= factor;  
  36.         if (settings.verbose > 1) {  
  37.             fprintf(stderr, “slab class %3d: chunk size %9u perslab %7u\n”,  
  38.                     i, slabclass[i].size, slabclass[i].perslab);  
  39.         }  
  40.     }  
  41.   
  42.     // slabclass中最后一个slabclass_t的trunk大小设置为最大item大小   
  43.     power_largest = i;  
  44.     slabclass[power_largest].size = settings.item_size_max;  
  45.     slabclass[power_largest].perslab = 1;  
  46.     if (settings.verbose > 1) {  
  47.         fprintf(stderr, “slab class %3d: chunk size %9u perslab %7u\n”,  
  48.                 i, slabclass[i].size, slabclass[i].perslab);  
  49.     }  
  50.     ….// 省略   
  51. }  

下面是我抓取的系统初始化trunk列表(CentOS6.0-64bit,memcached版本为1.4.7,factor默认为1.25):




  1. // 初始化或增大slab_list指针数组   
  2. static int grow_slab_list (const unsigned int id) {  
  3.     slabclass_t *p = &slabclass[id];  
  4.     // slabclass_t中已经分配的slabs数量与slab指针数组的大小相同,表示已满,如<span style=”color:#FF0000;”>下图</span>所示   
  5.     // 则,重新分配slab指针数组,指针数组增大为以前的2倍或初始化为16   
  6.     if (p->slabs == p->list_size) {  
  7.         size_t new_size =  (p->list_size != 0) ? p->list_size * 2 : 16;  
  8.         void *new_list = realloc(p->slab_list, new_size * sizeof(void *));  
  9.         if (new_list == 0) return 0;  
  10.         p->list_size = new_size;  
  11.         p->slab_list = new_list;  
  12.     }  
  13.     return 1;  
  14. }  



  1. // 初始化或重新分配一个slabclass[id]中的slab(每个slab包含perslab个trunk,每个trunk大小为size),<span style=”color:#FF0000;”>见下图</span>!   
  2. static int do_slabs_newslab(const unsigned int id) {  
  3.     slabclass_t *p = &slabclass[id];  
  4.     int len = p->size * p->perslab; // 每个trunk的size * 每个slab中trunk数量   
  5.     char *ptr;  
  6.   
  7.     // 第一次未分配时,p->slabs==0, mem_malloced==0   
  8.     // 如果已经分配过,mem_malloced + len > mem_limit表示超过定义的内存   
  9.     if ((mem_limit && mem_malloced + len > mem_limit && p->slabs > 0) ||  
  10.         (grow_slab_list(id) == 0) ||  // 如果slabs指针数组满了或未初始化,   
  11.                                       // 则增大slabs指针数组的大小(2倍或初始化为16)   
  12.         ((ptr = memory_allocate((size_t)len)) == 0)) {  // 调用malloc分配len大小内存或调整当前指针(预分配时)   
  13.   
  14.         MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id);  
  15.         return 0;  
  16.     }  
  17.   
  18.     memset(ptr, 0, (size_t)len);  
  19.     p->end_page_ptr = ptr;              // 当前slab可用trunk起始地址   
  20.     p->end_page_free = p->perslab;      // 当前slab可用的trunk数量   
  21.   
  22.     p->slab_list[p->slabs++] = ptr;     // 将分配的slab(trunk列表),放到slabs数组中   
  23.     mem_malloced += len;  
  24.     MEMCACHED_SLABS_SLABCLASS_ALLOCATE(id);  
  25.   
  26.     return 1;  
  27. }      

 



  1. /* 分配一个trunk数据结构,过程见<span style=”color:#FF0000;”>下图</span> */  
  2. static void *do_slabs_alloc(const size_t size, unsigned int id) {  
  3.     slabclass_t *p;  
  4.     void *ret = NULL;  
  5.   
  6.     // 索引非法   
  7.     if (id < POWER_SMALLEST || id > power_largest) {  
  8.         MEMCACHED_SLABS_ALLOCATE_FAILED(size, 0);  
  9.         return NULL;  
  10.     }  
  11.   
  12.     p = &slabclass[id];  
  13.     assert(p->sl_curr == 0 || ((item *)p->slots[p->sl_curr – 1])->slabs_clsid == 0);  
  14.   
  15. #ifdef USE_SYSTEM_MALLOC   
  16.     if (mem_limit && mem_malloced + size > mem_limit) {  
  17.         MEMCACHED_SLABS_ALLOCATE_FAILED(size, id);  
  18.         return 0;  
  19.     }  
  20.     mem_malloced += size;  
  21.     ret = malloc(size);  
  22.     MEMCACHED_SLABS_ALLOCATE(size, id, 0, ret);  
  23.     return ret;  
  24. #endif   
  25.   
  26.     /* fail unless we have space at the end of a recently allocated page, 
  27.        we have something on our freelist, or we could allocate a new page */  
  28.     if (! (p->end_page_ptr != 0 || p->sl_curr != 0 ||  
  29.            do_slabs_newslab(id) != 0)) {  
  30.         /* We don’t have more memory available */  
  31.         ret = NULL;  
  32.     } else if (p->sl_curr != 0) {       // freelist非空,优先从freelist分配   
  33.         /* return off our freelist */  
  34.         ret = p->slots[–p->sl_curr];  
  35.     } else {                            // 刚分配的   
  36.         /* if we recently allocated a whole page, return from that */  
  37.         assert(p->end_page_ptr != NULL);  
  38.         ret = p->end_page_ptr;  
  39.         if (–p->end_page_free != 0) {  
  40.             p->end_page_ptr = ((caddr_t)p->end_page_ptr) + p->size;  
  41.         } else {  
  42.             p->end_page_ptr = 0;  
  43.         }  
  44.     }  
  45.   
  46.     if (ret) {  
  47.         p->requested += size;  
  48.         MEMCACHED_SLABS_ALLOCATE(size, id, p->size, ret);  
  49.     } else {  
  50.         MEMCACHED_SLABS_ALLOCATE_FAILED(size, id);  
  51.     }  
  52.   
  53.     return ret;  
  54. }  

do_slabs_newslab函数初始化时,end_page_ptr指向slab的起始位置,end_page_free等于perslab;


do_slabs_alloc函数每次分配一个trunk(假设此时freelist为空),则end_page_ptr指向下一位置,end_page_free减1,直到分配完毕;后续申请,则新建一个slab(do_slabs_newslab函数的ptr = memory_allocate((size_t)len))。


初始化一个slab和分配trunk的过程图:




  1. // 释放trunk结构(将其放入freelist指针数组),结合“数据结构”部分图<span style=”color:#333333;”>可以更好的了解这个过程   
  2. static void do_slabs_free(void *ptr, const size_t size, unsigned int id) {  
  3.     slabclass_t *p;  
  4.   
  5.     assert(((item *)ptr)->slabs_clsid == 0);  
  6.     assert(id >= POWER_SMALLEST && id <= power_largest);  
  7.     if (id < POWER_SMALLEST || id > power_largest)  
  8.         return;  
  9.   
  10.     MEMCACHED_SLABS_FREE(size, id, ptr);  
  11.     p = &slabclass[id];  
  12.   
  13. #ifdef USE_SYSTEM_MALLOC   
  14.     mem_malloced -= size;  
  15.     free(ptr);  
  16.     return;  
  17. #endif   
  18.   
  19.     // 增加freelist指针数组大小为2倍或初始化为16   
  20.     if (p->sl_curr == p->sl_total) { /* need more space on the free list */  
  21.         int new_size = (p->sl_total != 0) ? p->sl_total * 2 : 16;  /* 16 is arbitrary */  
  22.         void **new_slots = realloc(p->slots, new_size * sizeof(void *));  
  23.         if (new_slots == 0)  
  24.             return;  
  25.         p->slots = new_slots;  
  26.         p->sl_total = new_size;  
  27.     }  
  28.     p->slots[p->sl_curr++] = ptr;   // 将ptr指向的trunk放入freelist指针数组   
  29.     p->requested -= size;  
  30.     return;  

对于slabs_alloc和slabs_free只是使用slabs_lock互斥锁,控制多线程对临界区资源的访问,分别调用了上述的do_slabs_alloc和do_slabs_free函数,这里不做过多解释。


内存管理模块对其它模块的接口主要有:slabs_init、slabs_alloc、slabs_free和slabs_clsid。


slabs_init在main函数中初始化部分调用,slabs_clsid和slabs_alloc在do_item_alloc函数中,每次存入一个item申请内存时调用slabs_clsid获得item对应大小的slabclass_t的索引clsid,然后通过clsid调用slabs_alloc函数分配一个trunk(一个item保存在一个trunk中),slabs_free在item_free函数中,释放item时调用,将item所在的trunk放入slabclass[clsid]的空闲trunk块指针数组(slots)中。


到此,slab部分介绍完毕,有什么高见敬请指教。

 今天来介绍memcached中hashtable部分的源码,hash部分的源码主要分布在assoc.h/c、hash.h/c中,总得来说代码比较简单,这里就稍微介绍一下。


hashtable通常包括哈希函数和解决冲突的方法两个最主要的因素,memcached使用的哈希函数为Bob Jenkins在1996年发明的,定义位于hash.h中,实现在hash.c中,作者与2006年时提出另一个新的hash算法,其具有更快的速度(近2倍)和更大的吞吐量,详情请参照 这里 ,这里不进行介绍了。


解决冲突的方法,memcached中采用了链地址法(或拉链法),从数据结构书中截取的一个采用链地址法(哈希函数为:key MOD 13)解决冲突的示意图:



所不同的是,memcached中定义了primary_hashtable和old_hashtable,当hashtable的填装因子(memcached中硬编码为 3/2,无法确定如何定值的),assoc_maintenance_thread线程会将old_hashtable中的items以hash_bulk_move个buckets为单位,逐步移到primary_hashtable中。


对外接口:


// 完成primary_hashtable的初始化


void assoc_init(void);


// 根据key和nkey查找对应的item


item *assoc_find(const char *key, const size_t nkey);


// 将item插入hashtable中


int assoc_insert(item *item);


// 从hashtable中删除key对应的item


void assoc_delete(const char *key, const size_t nkey);


// 下面两个函数分别为启动和结束hashtable维护的线程,如果不需要这个功能,可以不调用,就是会浪费primary_hashtable指针数组占用的内存资源


int start_assoc_maintenance_thread(void);


void stop_assoc_maintenance_thread(void);


void do_assoc_move_next_bucket(void); // 函数没有实现


实现部分:


首先介绍几个用到的变量:


static pthread_cond_t maintenance_cond; // 同步insert和hashtable维护线程的条件变量


static unsigned int hashpower = 16;


#define hashsize(n) ((ub4)1<<(n))              //hashtable初始化大小设置为2^16


#define hashmask(n) (hashsize(n)-1)         // 初始hashmask=0x1111 1111 1111 1111,用来将哈希函数计算的结果映射到hashsize域内,hashmask二进制值永远是hashpower位1的串


static unsigned int hash_items = 0;            // hashtable中存在的item数量


static bool expanding = false;                     // 是否正在扩展的flag


static unsigned int expand_bucket = 0;      // 当前扩展的位置(old_hashtable中的索引)



  1. // hashtable初始化   

  2. void assoc_init(void) {  

  3.     // 分配hashsize个buckets,每个bucket就是一个单向链表或null   

  4.     primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));  

  5.     if (! primary_hashtable) {  

  6.         fprintf(stderr, “Failed to init hashtable.\n”);  

  7.         exit(EXIT_FAILURE);  

  8.     }  

  9. }  

 



  1. 根据key和nkey查找对应的item,不存在返回NULL  

  2. item *assoc_find(const char *key, const size_t nkey) {  

  3.     uint32_t hv = hash(key, nkey, 0);  

  4.     item *it;  

  5.     unsigned int oldbucket;  

  6.   

  7.     // 如果正在扩展中,且hash值映射到尚未移入新primary_hashtable中的item,在old_hashtable查找,   

  8.     // 当前即将扩展索引expand_bucket位置的bucket,大于等于expand_bucket表示还在old_hashtable中   

  9.     if (expanding &&  

  10.         (oldbucket = (hv & hashmask(hashpower – 1))) >= expand_bucket)  

  11.     {  

  12.         it = old_hashtable[oldbucket];  

  13.     } else {  

  14.         it = primary_hashtable[hv & hashmask(hashpower)];  

  15.     }  

  16.   

  17.     item *ret = NULL;  

  18.     int depth = 0;  

  19.     while (it) {  

  20.         // 遍历单向链表,查找相应的item   

  21.         if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {  

  22.             ret = it;  

  23.             break;  

  24.         }  

  25.         it = it->h_next;  

  26.         ++depth;  

  27.     }  

  28.     MEMCACHED_ASSOC_FIND(key, nkey, depth);  

  29.     return ret;  

  30. }  

 



  1. /* returns the address of the item pointer before the key.  if *item == 0, 

  2.    the item wasn’t found */  

  3.   

  4. static item** _hashitem_before (const char *key, const size_t nkey) {  

  5.     uint32_t hv = hash(key, nkey, 0);  

  6.     item **pos;  

  7.     unsigned int oldbucket;  

  8.   

  9.     if (expanding &&  

  10.         (oldbucket = (hv & hashmask(hashpower – 1))) >= expand_bucket)  

  11.     {  

  12.         pos = &old_hashtable[oldbucket];  

  13.     } else {  

  14.         pos = &primary_hashtable[hv & hashmask(hashpower)];  

  15.     }  

  16.   

  17.     while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {  

  18.         pos = &(*pos)->h_next;  

  19.     }  

  20.     return pos;  

  21. }  

比较函数assoc_find和_hashitem_before,发现两者只是返回值不同,assoc_find使用了item指针(item*),而_hashitem_before使用了item指针的指针(item**),查找过程都是一样的,_hashitem_before只是在assoc_delete中调用,返回指针的指针,方便直接修改指向的地址,从而删除此item;使用指针同样可以达到效果,但是必须修改指向的item的值,相比较前者效率更高。



  1. /* Note: this isn’t an assoc_update.  The key must not already exist to call this */  

  2. // 上面英文注释已经说的很清楚了,插入的item的key必须在hashtable中不存在,否则assert错误   

  3. int assoc_insert(item *it) {  

  4.     uint32_t hv;  

  5.     unsigned int oldbucket;  

  6.   

  7.     assert(assoc_find(ITEM_key(it), it->nkey) == 0);  /* shouldn’t have duplicately named things defined */  

  8.   

  9.     // 计算hash值   

  10.     hv = hash(ITEM_key(it), it->nkey, 0);  

  11.     if (expanding &&  

  12.         (oldbucket = (hv & hashmask(hashpower – 1))) >= expand_bucket)  

  13.     {  

  14.         // 如果正在展开hashtable过程中,则将映射到expand_bucket之后的,   

  15.         // 即还没有展开的bucket部分,继续放在old_hashtable中,   

  16.         // 这样,当查询这些item时,从old_hashtable能够获得,参见assoc_find   

  17.         it->h_next = old_hashtable[oldbucket];  

  18.         old_hashtable[oldbucket] = it;  

  19.     } else {  

  20.         // 链表操作,放入链表起始位置   

  21.         it->h_next = primary_hashtable[hv & hashmask(hashpower)];   

  22.         primary_hashtable[hv & hashmask(hashpower)] = it;  

  23.     }  

  24.   

  25.     hash_items++;  

  26.     if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) { //3、2不知道怎么定的   

  27.         assoc_expand(); // 启动扩展   

  28.     }  

  29.   

  30.     MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);  

  31.     return 1;  

  32. }  

 



  1. // 从hashtable中删除键值为key的item   

  2. void assoc_delete(const char *key, const size_t nkey) {  

  3.     // 返回值是键值为key的item的指针的指针,感觉before用的不太恰当,开始以为是指向它前面的一个item   

  4.     item **before = _hashitem_before(key, nkey);  

  5.   

  6.     if (*before) {  

  7.         item *nxt;  

  8.         hash_items–;  

  9.         /* The DTrace probe cannot be triggered as the last instruction 

  10.          * due to possible tail-optimization by the compiler 

  11.          */  

  12.         MEMCACHED_ASSOC_DELETE(key, nkey, hash_items);  

  13.         nxt = (*before)->h_next;  

  14.         (*before)->h_next = 0;   /* probably pointless, but whatever. */  

  15.         *before = nxt;  

  16.         return;  

  17.     }  

  18.     /* Note:  we never actually get here.  the callers don’t delete things 

  19.        they can’t find. */  

  20.     assert(*before != 0);  

  21. }  

 



  1. // 扩展hashtable(这里只分配新的hashtable指针数组,设置相应的标志等,   

  2. // 真正的扩展在assoc_maintenance_thread线程中完成)   

  3. /* grows the hashtable to the next power of 2. */  

  4. static void assoc_expand(void) {  

  5.     old_hashtable = primary_hashtable;  

  6.   

  7.     // 扩大为原来的2倍   

  8.     primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));  

  9.     if (primary_hashtable) {  

  10.         if (settings.verbose > 1)  

  11.             fprintf(stderr, “Hash table expansion starting\n”);  

  12.         hashpower++;  

  13.         expanding = true;  

  14.         expand_bucket = 0;  

  15.         pthread_cond_signal(&maintenance_cond); // 设置条件变量   

  16.     } else {  

  17.         primary_hashtable = old_hashtable;  

  18.         /* Bad news, but we can keep running. */  

  19.     }  

  20. }  

 



  1. // 创建hashtable维护线程   

  2. int start_assoc_maintenance_thread() {  

  3.     int ret;  

  4.     // 如果环境变量MEMCACHED_HASH_BULK_MOVE设置,则使用此设置值   

  5.     // 维护线程中,每次扩展的粒度(每次hash_bulk_move个buckets)   

  6.     char *env = getenv(“MEMCACHED_HASH_BULK_MOVE”);  

  7.     if (env != NULL) {  

  8.         hash_bulk_move = atoi(env);  

  9.         if (hash_bulk_move == 0) {  

  10.             hash_bulk_move = DEFAULT_HASH_BULK_MOVE;  

  11.         }  

  12.     }  

  13.     if ((ret = pthread_create(&maintenance_tid, NULL,  

  14.                               assoc_maintenance_thread, NULL)) != 0) {  

  15.         fprintf(stderr, “Can’t create thread: %s\n”, strerror(ret));  

  16.         return -1;  

  17.     }  

  18.     return 0;  

  19. }  

 



  1. // 停止hashtable维护线程   

  2. void stop_assoc_maintenance_thread() {  

  3.     pthread_mutex_lock(&cache_lock);  

  4.     do_run_maintenance_thread = 0;          // 结束标志   

  5.     pthread_cond_signal(&maintenance_cond);  

  6.     pthread_mutex_unlock(&cache_lock);  

  7.   

  8.     /* Wait for the maintenance thread to stop */  

  9.     pthread_join(maintenance_tid, NULL);  

  10. }  

 



  1. #define DEFAULT_HASH_BULK_MOVE 1   

  2. int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;     // 数据转移粒度,即每次移动的bucket数量   

  3.   

  4. // hashtable维护线程(hashtable扩展使用)   

  5. static void *assoc_maintenance_thread(void *arg) {  

  6.   

  7.     // 维护线程运行标志   

  8.     while (do_run_maintenance_thread) {  

  9.         int ii = 0;  

  10.   

  11.         /* Lock the cache, and bulk move multiple buckets to the new 

  12.          * hash table. */  

  13.         pthread_mutex_lock(&cache_lock);  // 互斥访问old_hashtable   

  14.   

  15.         // 每次扩展hash_bulk_move(这里定义为1)个buckets到新的hashtable中   

  16.         for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {  

  17.             item *it, *next;  

  18.             int bucket;  

  19.   

  20.             // expand_bucket: 当前正在扩展的bucket索引   

  21.             for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {  

  22.                 next = it->h_next;  

  23.   

  24.                 bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);  

  25.                 it->h_next = primary_hashtable[bucket];  

  26.                 primary_hashtable[bucket] = it;  

  27.             }  

  28.   

  29.             old_hashtable[expand_bucket] = NULL;  

  30.   

  31.             // 到达old_hashtable结尾,则扩展结束   

  32.             expand_bucket++;  

  33.             if (expand_bucket == hashsize(hashpower – 1)) {  

  34.                 expanding = false;  

  35.                 free(old_hashtable);  

  36.                 if (settings.verbose > 1)  

  37.                     fprintf(stderr, “Hash table expansion done\n”);  

  38.             }  

  39.         }  

  40.           

  41.         if (!expanding) {  

  42.             /* We are done expanding.. just wait for next invocation */  

  43.             pthread_cond_wait(&maintenance_cond, &cache_lock);  

  44.         }  

  45.   

  46.         pthread_mutex_unlock(&cache_lock);  

  47.     }  

  48.     return NULL;  

  49. }  

查找的性能分析


从hashtable的构建和查找过程可见:


1.虽然hashtable在关键字和记录的存储位置之间建立了直接映像,但由于“冲突”的产生,hashtable的查找过程仍然是一个给定值和关键字相比较的过程,因此通常以平均查找长度作为衡量hashtable的度量。


2.查找过程中与关键字进行比较的次数通常取决于三个因素:哈希函数,解决冲突的方法和哈希表的填装因子。


哈希函数的好坏首先影响出现冲突的频繁程度,但是,对于“均匀”的哈希函数可以假定:不同的哈希函数对于同一组随机的关键字,产生冲突的可能性相同,因为一般情况下都假定哈希函数是均匀的,则不考虑它对平均查找长度的影响。


采用不同的处理冲突的方法,它们的平均查找长度也不同,通常处理冲突方法相同的hashtable,其平均查找长度依赖于哈希表的装填因子:


装填因子a = 表中填入的记录数/哈希表长度


哈希表装填因子a表明哈希表的装满程度,一般越小发生冲突的可能性也就越小,反之,冲突可能性越大,查找过程中与关键字比较的次数越多。


平均查找长度一般符合下式(来自《数据结构》书籍):



由此可见hashtable的平均查找长度是a的函数,而不是n的函数,由此,不管n多大,总可以选择一个合适的装填因子a以便将平均查找长度限定在一个范围内。


程序中hashtable扩展时出现的3/2就是装填因子,不确定具体的值是怎么选取的。


研究开源软件一方面是学习,另一方是应用,memcached的hashtable模块化设计非常好,只要稍加改动就可以应用了。


第一个就是item结构体的定义,在hashtable内部只用到了item结构体的nkey,h_next字段和ITEM_key宏,保留前2个字段,并改写ITEM_key宏就可以忽略memcached的协议相关的设计;


第二个就是pthread_mutex_t cache_lock定义,用于assoc_*函数的互斥操作hashtable;


第三个是#define ENDIAN_LITTLE 1宏定义,由于hash函数采用了位操作,所以必须定义;


其它的就是trace.h中定义的宏和一些基本变量的别名定义,如uint8_t,uint16_t,uint32_t等;


最后一定要将上述的定义include进入hash.c和assoc.c中。

不过首先奉上我自己画的一张图,就称为memcached多线程交互的活动图吧,通过此图就基本掌握了main thread与单个worker thread的交互过程,图中序号表示基本的处理流程(图中driver_machine拼写错误,应为drive_machine,特此更正)。



下图为状态机,红色部分由具体的conn的运行状态决定:



main thread在监听socket上注册EV_READ事件时,TCP设置conn状态为conn_listening,所以每次有新的连接请求时,因为conn_listening不存在向其它状态转换的路径,所以它只调用drive_machine中的conn_listening处理分支,其并不会执行其它部分;UDP直接创建一个状态为conn_read的conn,将其放入worker thread的new_conn_queue队列中。


conn_listening处理中接收的新连接状态设置为conn_new_cmd。所以worker thread接收数据的的处理过程从conn_new_cmd部分代码开始,正常的流程为:conn_new_cmd->conn_parse_cmd,之后process_command处理各种不同的cmd(参照memcached协议)。


不知道是有些状态用不到,还是状态机没有画完全,存在不可到达状态。


【由于memcached的版本不同,下面会存在不同,不过基本流程相同】


先看下memcahced启动时线程处理的流程



memcached的多线程主要是通过实例化多个libevent实现的,分别是一个主线程和n个workers线程。


无论是主线程还是workers线程全部通过libevent异步事件模型来管理网络事件,实际上每个线程都是一个单独的libevent实例。


主线程负责监听客户端建立连接的请求,以及accept建立连接;


workers线程负责处理已经建立好的连接的读写等事件。


先看一下大致的图示:




首先看下主要的数据结构(thread.c):




  1. /* An item in the connection queue. */  

  2. typedef struct conn_queue_item CQ_ITEM;  

  3. struct conn_queue_item {  

  4.     int     sfd;  

  5.     int     init_state;  

  6.     int     event_flags;  

  7.     int     read_buffer_size;  

  8.     int     is_udp;  

  9.     CQ_ITEM *next;  

  10. };  

CQ_ITEM 实际上是主线程accept后返回的已建立连接的fd的封装 



  1. /* A connection queue. */  

  2. typedef struct conn_queue CQ;  

  3. struct conn_queue {  

  4.     CQ_ITEM *head;  

  5.     CQ_ITEM *tail;  

  6.     pthread_mutex_t lock;  

  7.     pthread_cond_t  cond;  

  8. };  

CQ是一个管理CQ_ITEM的单向链表


  1. typedef struct {  

  2.     pthread_t thread_id;        /* unique ID of this thread */  

  3.     struct event_base *base;    /* libevent handle this thread uses */  

  4.     struct event notify_event;  /* listen event for notify pipe */  

  5.     int notify_receive_fd;      /* receiving end of notify pipe */  

  6.     int notify_send_fd;         /* sending end of notify pipe */  

  7.     CQ  new_conn_queue;         /* queue of new connections to handle */  

  8. } LIBEVENT_THREAD; 

这是memcached里的线程结构的封装,可以看到每个线程都包含一个CQ队列,一条通知管道pipe


和一个libevent的实例event_base。


另外一个重要的最重要的结构是对每个网络连接的封装conn



  1. typedef struct{  

  2.   int sfd;  

  3.   int state;  

  4.   struct event event;  

  5.   short which;  

  6.   char *rbuf;  

  7.   … //这里省去了很多状态标志和读写buf信息等  

  8. }conn;  

memcached主要通过设置/转换连接的不同状态,来处理事件(核心函数是drive_machine)


下面看下线程的初始化流程:


在memcached.c的main函数中,首先对主线程的libevent做了初始化



  1. /* initialize main thread libevent instance */  

  2.  main_base = event_init();  

然后初始化所有的workers线程,并启动,启动过程细节在后面会有描述



  1. /* start up worker threads if MT mode */  

  2. thread_init(settings.num_threads, main_base);  

接着主线程调用(这里只分析tcp的情况,目前memcached支持udp方式)


server_socket(settings.port, 0)  


这个方法主要是封装了创建监听socket,绑定地址,设置非阻塞模式并注册监听socket的


libevent 读事件等一系列操作


然后主线程调用



  1. /* enter the event loop */  

  2. event_base_loop(main_base, 0);  

这时主线程启动开始通过libevent来接受外部连接请求,整个启动过程完毕


下面看看thread_init是怎样启动所有workers线程的,看一下thread_init里的核心代码



  1. void thread_init(int nthreads, struct event_base *main_base) {  

  2.  //。。。省略  

  3.    threads = malloc(sizeof(LIBEVENT_THREAD) * nthreads);  

  4.     if (! threads) {  

  5.         perror(“Can’t allocate thread descriptors”);  

  6.         exit(1);  

  7.     }  

  8.  

  9.     // 原作者的memcached版本为1.2.6,不知道什么原因,这里存在错误,请以最新代码为准

  10.     threads[0].base = main_base;  

  11.     threads[0].thread_id = pthread_self();  

  12.     // i从0开始,会冲掉上面的赋值,下面红色部分解释。

  13.     for (i = 0; i < nthreads; i++) {  

  14.         int fds[2];  

  15.         if (pipe(fds)) {  

  16.             perror(“Can’t create notify pipe”);  

  17.             exit(1);  

  18.         }  

  19.   

  20.         threads[i].notify_receive_fd = fds[0];  

  21.         threads[i].notify_send_fd = fds[1];  

  22.   

  23.     setup_thread(&threads[i]);  

  24.     }  

  25.   

  26.     /* Create threads after we’ve done all the libevent setup. */  

  27.     for (i = 1; i < nthreads; i++) {  

  28.         create_worker(worker_libevent, &threads[i]);  

  29.     }  

  30. }  

threads的声明是这样的


static LIBEVENT_THREAD *threads;


thread_init首先malloc线程的空间,然后第一个threads作为主线程,其余都是workers线程


然后为每个线程创建一个pipe,这个pipe被用来作为主线程通知workers线程有新的连接到达


看下setup_thread



  1. static void setup_thread(LIBEVENT_THREAD *me) {  

  2.     if (! me->base) {  

  3.         me->base = event_init();  

  4.         if (! me->base) {  

  5.             fprintf(stderr, “Can’t allocate event base\n”);  

  6.             exit(1);  

  7.         }  

  8.     }  

  9.   

  10.     /* Listen for notifications from other threads */  

  11.     event_set(&me->notify_event, me->notify_receive_fd,  

  12.               EV_READ | EV_PERSIST, thread_libevent_process, me);  

  13.     event_base_set(me->base, &me->notify_event);  

  14.   

  15.     if (event_add(&me->notify_event, 0) == -1) {  

  16.         fprintf(stderr, “Can’t monitor libevent notify pipe\n”);  

  17.         exit(1);  

  18.     }  

  19.   

  20.     cq_init(&me->new_conn_queue);  

  21. }  

setup_thread主要是创建所有workers线程的libevent实例(主线程的libevent实例在main函数中已经建立)


由于之前 threads[0].base = main_base;所以第一个线程(主线程)在这里不会执行event_init()


注:我看的memcached-1.4.7源码中,main thread线程不是放在threads[0]位置的,而是在全局变量dispatcher_thread中保存,threads为全部的worker线程,其实对程序的理解影响并不大。


然后就是注册所有workers线程的管道读端的libevent的读事件,等待主线程的通知;


最后在该方法里将所有的workers的CQ初始化了。


create_worker实际上就是真正启动了线程,pthread_create调用worker_libevent方法,该方法执行


event_base_loop启动该线程的libevent。


这里我们需要记住每个workers线程目前只在自己线程的管道的读端有数据时可读时触发,并调用


thread_libevent_process方法


看一下这个函数



  1. static void thread_libevent_process(int fd, short which, void *arg){  

  2.     LIBEVENT_THREAD *me = arg;  

  3.     CQ_ITEM *item;  

  4.     char buf[1];  

  5.   

  6.     if (read(fd, buf, 1) != 1)  

  7.         if (settings.verbose > 0)  

  8.             fprintf(stderr, “Can’t read from libevent pipe\n”);  

  9.   

  10.     item = cq_peek(&me->new_conn_queue);  

  11.   

  12.     if (NULL != item) {  

  13.         conn *c = conn_new(item->sfd, item->init_state, item->event_flags,  

  14.                            item->read_buffer_size, item->is_udp, me->base);  

  15.         。。。//省略  

  16.     }  

  17. }  

函数参数的fd是这个线程的管道读端的描述符


首先将管道的1个字节通知信号读出(这是必须的,在水平触发模式下如果不处理该事件,则会被循环通知,直到事件被处理)


cq_peek是从该线程的CQ队列中取队列头的一个CQ_ITEM,这个CQ_ITEM是被主线程丢到这个队列里的,item->sfd是已经建立的连接的描述符,通过conn_new函数为该描述符注册libevent的读事件,me->base是代表自己的一个线程结构体,就是说对该描述符的事件处理交给当前这个workers线程处理,conn_new方法的最重要的内容是:



  1. conn *conn_new(const int sfd, const int init_state, const int event_flags,  

  2.                 const int read_buffer_size, const bool is_udp, struct event_base *base) {  

  3.     。。。  

  4.             event_set(&c->event, sfd, event_flags, event_handler, (void *)c);  

  5.         event_base_set(base, &c->event);  

  6.         c->ev_flags = event_flags;  

  7.         if (event_add(&c->event, 0) == -1) {  

  8.         if (conn_add_to_freelist(c)) {  

  9.             conn_free(c);  

  10.         }  

  11.         perror(“event_add”);  

  12.         return NULL;  

  13.         }  

  14.     。。。  

  15. }  

可以看到新的连接被注册了一个事件(实际是EV_READ|EV_PERSIST),由当前线程处理(因为这里的event_base是该workers线程自己的)。


当该连接有可读数据时会回调event_handler函数,实际上event_handler里主要是调用memcached的核心方法drive_machine。


最后看看主线程是如何通知workers线程处理新连接的,主线程的libevent注册的是监听socket描述字的可读事件,就是说当有建立连接请求时,主线程会处理,回调的函数是也是event_handler(因为实际上主线程也是通过conn_new初始化的监听socket 的libevent可读事件)。


最后看看memcached网络事件处理的最核心部分- drive_machine


需要铭记于心的是drive_machine是多线程环境执行的,主线程和workers都会执行drive_machine



  1. static void drive_machine(conn *c) {  

  2.     bool stop = false;  

  3.     int sfd, flags = 1;  

  4.     socklen_t addrlen;  

  5.     struct sockaddr_storage addr;  

  6.     int res;  

  7.   

  8.     assert(c != NULL);  

  9.   

  10.     while (!stop) {  

  11.   

  12.         switch(c->state) {  

  13.         case conn_listening:  

  14.             addrlen = sizeof(addr);  

  15.             if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {  

  16.                 //省去n多错误情况处理  

  17.                 break;  

  18.             }  

  19.             if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||  

  20.                 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {  

  21.                 perror(“setting O_NONBLOCK”);  

  22.                 close(sfd);  

  23.                 break;  

  24.             }  

  25.             dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,  

  26.                                      DATA_BUFFER_SIZE, false);  

  27.             break;  

  28.   

  29.         case conn_read:  

  30.             if (try_read_command(c) != 0) {  

  31.                 continue;  

  32.             }  

  33.         ….//省略  

  34.      }       

  35.  }  

首先大家不到被while循环误导(大部分做Java的同学都会马上联想到是个周而复始的loop)其实while通常满足一个case后就会break了,这里用while是考虑到垂直触发方式下,必须读到EWOULDBLOCK错误才可以。


言归正传,drive_machine主要是通过当前连接的state来判断该进行何种处理,因为通过libevent注册了读写时间后回调的都是这个核心函数,所以实际上我们在注册libevent相应事件时,会同时把事件状态写到该conn结构体里,libevent进行回调时会把该conn结构作为参数传递过来,就是该方法的形参。


memcached里连接的状态通过一个enum声明



  1. enum conn_states {  

  2.     conn_listening,  /** the socket which listens for connections */  

  3.     conn_read,       /** reading in a command line */  

  4.     conn_write,      /** writing out a simple response */  

  5.     conn_nread,      /** reading in a fixed number of bytes */  

  6.     conn_swallow,    /** swallowing unnecessary bytes w/o storing */  

  7.     conn_closing,    /** closing this connection */  

  8.     conn_mwrite,     /** writing out many items sequentially */  

  9. };  

实际对于case conn_listening:这种情况是主线程自己处理的,workers线程永远不会执行此分支


我们看到主线程进行了accept后调用了


dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,DATA_BUFFER_SIZE, false);


这个函数就是通知workers线程的地方,看看



  1. void dispatch_conn_new(int sfd, int init_state, int event_flags,  

  2.                        int read_buffer_size, int is_udp) {  

  3.     CQ_ITEM *item = cqi_new();  

  4.     int thread = (last_thread + 1) % settings.num_threads;  

  5.   

  6.     last_thread = thread;  

  7.   

  8.     item->sfd = sfd;  

  9.     item->init_state = init_state;  

  10.     item->event_flags = event_flags;  

  11.     item->read_buffer_size = read_buffer_size;  

  12.     item->is_udp = is_udp;  

  13.   

  14.     cq_push(&threads[thread].new_conn_queue, item);  

  15.   

  16.     MEMCACHED_CONN_DISPATCH(sfd, threads[thread].thread_id);  

  17.     if (write(threads[thread].notify_send_fd, “”, 1) != 1) {  

  18.         perror(“Writing to thread notify pipe”);  

  19.     }  

  20. }  

可以清楚的看到,主线程首先创建了一个新的CQ_ITEM,然后通过round robin策略选择了一个thread


并通过cq_push将这个CQ_ITEM放入了该线程的CQ队列里,那么对应的workers线程是怎么知道的呢


就是通过这个


write(threads[thread].notify_send_fd, “”, 1)


向该线程管道写了1字节数据,则该线程的libevent立即回调了thread_libevent_process方法(上面已经描述过),然后那个线程取出item,注册读时间,当该条连接上有数据时,最终也会回调drive_machine方法,也就是drive_machine方法的 case conn_read:等全部是workers处理的,主线程只处理conn_listening 建立连接。


 


 


 


 

今天主要总结items相关的操作,items的操作分布比较多,定义和实现在memcachd.h/c、thread.h/c、items.h/c都有,感觉完全可以放在items.h/c中。这里就对所有的这些操作(除去stats部分)进行一个简单的总结。


首先对数据结构、ITEM_*宏和一些变量进行一个简单的说明,这里先建立一个宏观的概念,理解了它们的用途对后续阅读程序有很大的帮助。



  1. /** 
  2.  * Structure for storing items within memcached. 
  3.  */  
  4. typedef struct _stritem {  
  5.     struct _stritem *next;      // next, prev在LRU队列中使用   
  6.     struct _stritem *prev;  
  7.     struct _stritem *h_next;    /* hash chain next */     // 用于hashtable中   
  8.     rel_time_t      time;       /* least recent access */ // 最近访问时间   
  9.     rel_time_t      exptime;    /* expire time */         // 过期时间   
  10.     int             nbytes;     /* size of data */        // 数据的长度(数据格式等下面介绍)   
  11.     unsigned short  refcount;   // 引用计数   
  12.     uint8_t         nsuffix;    /* length of flags-and-length string */  
  13.     uint8_t         it_flags;   /* ITEM_* above */  
  14.     uint8_t         slabs_clsid;/* which slab class we’re in */  
  15.     uint8_t         nkey;       /* key length, w/terminating null and padding */  
  16.     /* this odd type prevents type-punning issues when we do 
  17.      * the little shuffle to save space when not using CAS. */  
  18.     union {  
  19.         uint64_t cas;  
  20.         char end;  
  21.     } data[];  
  22.     /* if it_flags & ITEM_CAS we have 8 bytes CAS */  
  23.     /* then null-terminated key */  
  24.     /* then ” flags length\r\n” (no terminating null) */  
  25.     /* then data with terminating \r\n (no terminating null; it’s binary!) */  
  26. } item;  

从上面的定义可以知道data字段的格式如下:


if it_flags & ITEM_CAS, 8bytes      # 如果ITEM_CAS标志设置时,这里有8字节的数据


key<null>                                       # 键值字符串,以null结尾,nkey长度不包括null字符


flags-length\r\n                               # flags-and-length字符串,以\r\n结尾,nsuffix长度包括\r\n


value\r\n                                         # 数据域,以\r\n结尾,nbytes长度包括\r\n


知道了具体的data字段的格式,ITEM_*宏就很容易理解了,主要考虑ITEM_CAS标志设置时,后面的key,flags-length及value字段的存取都需要后移8字节。



  1. // 获取cas值,即当<span style=”font-size:16px;”>ITEM_CAS标志设置时</span>,返回8字节的cas值,否则返回0   
  2. #define ITEM_get_cas(i) (((i)->it_flags & ITEM_CAS) ? \   
  3.         (i)->data->cas : (uint64_t)0)  
  4.   
  5. // 设置cas值,同上类似   
  6. #define ITEM_set_cas(i,v) { \   
  7.     if ((i)->it_flags & ITEM_CAS) { \  
  8.         (i)->data->cas = v; \  
  9.     } \  
  10. }  
  11. // 获得key值,<span style=”font-size:16px;”>如果ITEM_CAS标志设置时</span>,需要后移8字节   
  12. #define ITEM_key(item) (((char*)&((item)->data)) \   
  13.          + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))  
  14.   
  15. // 获得suffix字段,同上类似   
  16. #define ITEM_suffix(item) ((char*) &((item)->data) + (item)->nkey + 1 \   
  17.          + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))  
  18. // 获得数据域   
  19. #define ITEM_data(item) ((char*) &((item)->data) + (item)->nkey + 1 \   
  20.          + (item)->nsuffix \  
  21.          + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))  
  22.   
  23. // 获得总结构的大小   
  24. #define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 \   
  25.          + (item)->nsuffix + (item)->nbytes \  
  26.          + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))  

下面3个宏,定义了item的状态:



  1. #define ITEM_LINKED 1   
  2. #define ITEM_CAS 2   
  3. #define ITEM_SLABBED 4  

ITEM_LINKED表示item处于hashtable和LRU队列中,分别在do_item_link(将item放入hashtable和LRU队列)和do_item_unlink(将item从hashtable和LRU队列移除)中设置和置位,后续的对it_flags &ITEM_LINKED的判断,根据是否在hashtable和LRU队列中,从而能够进行或禁止某些操作,如:如果item在hashtable和LRU队列,就不能执行free_item操作(assert((it->it_flags & ITEM_LINKED) == 0)保证),必须首先do_item_unlink。


Item_CAS是根据用户命令行参数决定的,CAS表示check and set,只有cas字段一致时(check)才执行后续操作(set),否则操作失败。


ITEM_SLABBED,这个标志只有在item_free函数中,slabs_free函数调用前设置,说明只有将其放入slabs对应的slabclass_t的freelist指针数组中时,才设置此标志,即设置此标志的item应该为free状态,it_flags & ITEM_SLABBED即可以以此来理解。


下面说明LRU队列,主要包括heads和tails,sizes数组只是为了stats时使用,记录了每个对应的heads中item的数量:



  1. #define LARGEST_ID POWER_LARGEST    
  2. static item *heads[LARGEST_ID];         // LRU head buckets, 每个与slabclass对应,双向链表(每次插入放最前面)   
  3. static item *tails[LARGEST_ID];         // LRU tail buckets, 双向链表,记录了LRU队列的最后一个元素(oldest)  

slabclass定义为:



  1. #define MAX_NUMBER_OF_SLAB_CLASSES (POWER_LARGEST + 1)   
  2. static slabclass_t slabclass[MAX_NUMBER_OF_SLAB_CLASSES];   // slabclass数组  

感觉 #define LARGEST_ID POWER_LARGEST  改为    #define LARGEST_ID POWER_LARGEST+1 更合理一些,这样heads就与slabclass一一对应起来了,虽然slabclass及heads并不是每个位置都使用了。因为slabclass[0]并没有使用,所以heads[0]和tails[0]也没有使用(使用方法heads[slabs_clsid],slabs_clsid非0)。


全局设置setting中存在一个oldest_live的字段,表示在oldest_live时间之前的数据失效,比如flush_all操作(memcached的文本协议),设置oldest_live为当前时间,这样之前的数据都失效了,LRU算法执行时,就可以复用这些item了,这里称它为截止时间


还有就是对memcached中引用计数的理解,只有do_item_get(或do_item_get_nocheck)时引用计数++,do_item_remove时引用计数–,其它地方不涉及具体的操作。


只有item_link_q和item_unlink_q(名字后面有个_q)两个函数才操作heads和tails数组,完成对LRU队列的维护。


下面贴出代码,并进行相应的注释。



  1. /** 
  2.  * Generates the variable-sized part of the header for an object. 
  3.  * 
  4.  * key     – The key 
  5.  * nkey    – The length of the key(include the null terminator) 
  6.  * flags   – key flags 
  7.  * nbytes  – Number of bytes to hold value and addition CRLF terminator 
  8.  * suffix  – Buffer for the “VALUE” line suffix (flags, size). 
  9.  * nsuffix – The length of the suffix is stored here. 
  10.  * 
  11.  * Returns the total size of the header. 
  12.  */  
  13. static size_t item_make_header(const uint8_t nkey, const int flags, const int nbytes,  
  14.                      char *suffix, uint8_t *nsuffix) {  
  15.     /* suffix is defined at 40 chars elsewhere.. */  
  16.     *nsuffix = (uint8_t) snprintf(suffix, 40, ” %d %d\r\n”, flags, nbytes – 2);  
  17.     return sizeof(item) + nkey + *nsuffix + nbytes;  
  18. }  
  19.   
  20. /* 分配一个item结构 */  
  21. item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes) {  
  22.     uint8_t nsuffix;  
  23.     item *it = NULL;  
  24.     char suffix[40];  
  25.     size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);  
  26.     if (settings.use_cas) {  
  27.         ntotal += sizeof(uint64_t);         // if( (it_flags & ITEM_CAS)!=0), 8bytes CAS   
  28.     }  
  29.   
  30.     unsigned int id = slabs_clsid(ntotal);  // 返回对应的slab的clsid   
  31.     if (id == 0)                            // 查找失败   
  32.         return 0;  
  33.   
  34.     /* do a quick check if we have any expired items in the tail.. */  
  35.     int tries = 50;  
  36.     item *search;  
  37.     rel_time_t oldest_live = settings.oldest_live;  
  38.   
  39.     for (search = tails[id];  
  40.          tries > 0 && search != NULL;  
  41.          tries–, search=search->prev) {  
  42.         if (search->refcount == 0 &&            // 引用计数为0   
  43.             ((search->time < oldest_live) ||    // dead by flush,比oldest_live旧的已失效   
  44.              (search->exptime != 0 && search->exptime < current_time))) { // 过期   
  45.             it = search;  
  46.             /* I don’t want to actually free the object, just steal 
  47.              * the item to avoid to grab the slab mutex twice 😉 
  48.              */  
  49.             STATS_LOCK();  
  50.             stats.reclaimed++;  
  51.             STATS_UNLOCK();  
  52.             itemstats[id].reclaimed++;  
  53.             it->refcount = 1;           // 后面会重用此item   
  54.               
  55.             // 调整old item与新的item之间的差值,ITEM_ntotal(it)与ntotal计算方式相同   
  56.             slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);  
  57.               
  58.             do_item_unlink(it);                 // 从hashtable和LRU队列摘除item   
  59.             /* Initialize the item block: */  
  60.             it->slabs_clsid = 0;  
  61.             it->refcount = 0;  
  62.             break;  
  63.         }  
  64.     }  
  65.    
  66.     // 上面查找过程没有找到,则首先分配一个,如果分配失败,则通过LRU算法尝试摘除一些item   
  67.     // slabs_alloc:从slabclass[id]中分配一个size大小的trunk,错误时返回NULL(0)   
  68.     if (it == NULL && (it = slabs_alloc(ntotal, id)) == NULL) {  
  69.         /* 
  70.         ** Could not find an expired item at the tail, and memory allocation 
  71.         ** failed. Try to evict some items! 
  72.         */  
  73.   
  74.         /* If requested to not push old items out of cache when memory runs out, 
  75.          * we’re out of luck at this point… 
  76.          */  
  77.   
  78.         // 通过M指定不使用LRU算法淘汰old item   
  79.         // -M: return error on memory exhausted (rather than removing items)   
  80.         if (settings.evict_to_free == 0) {  
  81.             itemstats[id].outofmemory++;  
  82.             return NULL;  
  83.         }  
  84.   
  85.         /* 
  86.          * try to get one off the right LRU 
  87.          * don’t necessarily unlink the tail because it may be locked: refcount>0 
  88.          * search up from tail an item with refcount==0 and unlink it; give up after 50 
  89.          * tries 
  90.          */  
  91.   
  92.         if (tails[id] == 0) {  
  93.             itemstats[id].outofmemory++;  
  94.             return NULL;  
  95.         }  
  96.         tries = 50; // 最多尝试50次   
  97.         for (search = tails[id]; tries > 0 && search != NULL; tries–, search=search->prev)   
  98.         {  
  99.             if (search->refcount == 0) // 引用计数为0   
  100.             {  
  101.                 if (search->exptime == 0 || search->exptime > current_time)   
  102.                 {  
  103.                     itemstats[id].evicted++;  
  104.                     itemstats[id].evicted_time = current_time – search->time;  
  105.                     if (search->exptime != 0)  
  106.                         itemstats[id].evicted_nonzero++;  
  107.                     STATS_LOCK();  
  108.                     stats.evictions++;  
  109.                     STATS_UNLOCK();  
  110.                 }   
  111.                 else   
  112.                 {  
  113.                     itemstats[id].reclaimed++;  
  114.                     STATS_LOCK();  
  115.                     stats.reclaimed++;  
  116.                     STATS_UNLOCK();  
  117.                 }  
  118.                 do_item_unlink(search); // 从hashtable及LRU队列摘除item项   
  119.                   
  120.                 break;                  // 找到第一个引用计数为0的就结束此循环   
  121.             }  
  122.         }  
  123.         // 这里重新分配,如果上面循环回收了item结构,这里能够分配成功   
  124.         // 否则,分配会失败,然后强制回收时间锁定(refcount>0)时间过长(超过3小时)的item   
  125.         it = slabs_alloc(ntotal, id);     
  126.         if (it == 0)   
  127.         {  
  128.             itemstats[id].outofmemory++;  
  129.             /* Last ditch effort. There is a very rare bug which causes 
  130.              * refcount leaks. We’ve fixed most of them, but it still happens, 
  131.              * and it may happen in the future. 
  132.              * We can reasonably assume no item can stay locked for more than 
  133.              * three hours, so if we find one in the tail which is that old, 
  134.              * free it anyway. 
  135.              */  
  136.             tries = 50;  
  137.             for (search = tails[id]; tries > 0 && search != NULL; tries–, search=search->prev)   
  138.             {  
  139.                 if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time)   
  140.                 {  
  141.                     itemstats[id].tailrepairs++;  
  142.                     search->refcount = 0;  
  143.                     do_item_unlink(search);  
  144.                     break;  
  145.                 }  
  146.             }  
  147.             it = slabs_alloc(ntotal, id);  
  148.             if (it == 0) {  
  149.                 return NULL;  
  150.             }  
  151.         }  
  152.     }  
  153.   
  154.     assert(it->slabs_clsid == 0);  
  155.   
  156.     it->slabs_clsid = id;  
  157.   
  158.     assert(it != heads[it->slabs_clsid]);  
  159.   
  160.     it->next = it->prev = it->h_next = 0;  
  161.     it->refcount = 1;     /* the caller will have a reference */  
  162.     DEBUG_REFCNT(it, ‘*’);  
  163.     it->it_flags = settings.use_cas ? ITEM_CAS : 0; //? 0   
  164.     it->nkey = nkey;                                // key由null结尾,nkey长度不包括null字符   
  165.     it->nbytes = nbytes;                            // value由\r\n结尾,nbytes长度包括\r\n   
  166.     memcpy(ITEM_key(it), key, nkey);  
  167.     it->exptime = exptime;  
  168.     memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);  
  169.     it->nsuffix = nsuffix;                          // suffix由\r\n结尾,nsuffix长度包括\r\n   
  170.     return it;  
  171. }  
  172.   
  173. void item_free(item *it) {  
  174.     size_t ntotal = ITEM_ntotal(it);  
  175.     unsigned int clsid;  
  176.     assert((it->it_flags & ITEM_LINKED) == 0);  
  177.     assert(it != heads[it->slabs_clsid]);  
  178.     assert(it != tails[it->slabs_clsid]);  
  179.     assert(it->refcount == 0);  
  180.   
  181.     /* so slab size changer can tell later if item is already free or not */  
  182.     clsid = it->slabs_clsid;  
  183.     it->slabs_clsid = 0;  
  184.     it->it_flags |= ITEM_SLABBED;  
  185.     DEBUG_REFCNT(it, ‘F’);  
  186.     // 将ptr指向的item结构放入slabclass[clsid]的slabclass_t的freelist数组中   
  187.     slabs_free(it, ntotal, clsid);  
  188. }  
  189.   
  190. /** 
  191.  * Returns true if an item will fit in the cache (its size does not exceed 
  192.  * the maximum for a cache entry.) 
  193.  */  
  194. bool item_size_ok(const size_t nkey, const int flags, const int nbytes) {  
  195.     char prefix[40];  
  196.     uint8_t nsuffix;  
  197.   
  198.     size_t ntotal = item_make_header(nkey + 1, flags, nbytes,  
  199.                                      prefix, &nsuffix);  
  200.     if (settings.use_cas) {  
  201.         ntotal += sizeof(uint64_t);  
  202.     }  
  203.   
  204.     return slabs_clsid(ntotal) != 0;  
  205. }  
  206.   
  207. // 把该item插入LRU队列   
  208. static void item_link_q(item *it) { /* item is the new head */  
  209.     item **head, **tail;  
  210.     assert(it->slabs_clsid < LARGEST_ID);  
  211.     assert((it->it_flags & ITEM_SLABBED) == 0);  
  212.   
  213.     head = &heads[it->slabs_clsid];  
  214.     tail = &tails[it->slabs_clsid];  
  215.     assert(it != *head);  
  216.     assert((*head && *tail) || (*head == 0 && *tail == 0));  
  217.     it->prev = 0;  
  218.     it->next = *head;  
  219.     if (it->next) it->next->prev = it;  
  220.     *head = it;  
  221.     if (*tail == 0) *tail = it;  
  222.     sizes[it->slabs_clsid]++;  
  223.     return;  
  224. }  
  225.   
  226. // 从LRU队列删除此item   
  227. static void item_unlink_q(item *it) {  
  228.     item **head, **tail;  
  229.     assert(it->slabs_clsid < LARGEST_ID);  
  230.     head = &heads[it->slabs_clsid];  
  231.     tail = &tails[it->slabs_clsid];  
  232.     // 头部   
  233.     if (*head == it) {  
  234.         assert(it->prev == 0);  
  235.         *head = it->next;  
  236.     }  
  237.     // 尾部   
  238.     if (*tail == it) {  
  239.         assert(it->next == 0);  
  240.         *tail = it->prev;  
  241.     }  
  242.     assert(it->next != it);  
  243.     assert(it->prev != it);  
  244.   
  245.     // 改变双向链表指针   
  246.     if (it->next) it->next->prev = it->prev;  
  247.     if (it->prev) it->prev->next = it->next;  
  248.     sizes[it->slabs_clsid]–;  
  249.     return;  
  250. }  
  251.   
  252. // 将item放入hashtable和LRU队列   
  253. int do_item_link(item *it) {  
  254.     MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);  
  255.     assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);  
  256.     it->it_flags |= ITEM_LINKED;    // 设置ITEM_LINKED标志   
  257.     it->time = current_time;        // 最近访问时间为当前时间   
  258.     assoc_insert(it);               // 将item指针插入hash表中   
  259.   
  260.     STATS_LOCK();  
  261.     stats.curr_bytes += ITEM_ntotal(it);  
  262.     stats.curr_items += 1;  
  263.     stats.total_items += 1;  
  264.     STATS_UNLOCK();  
  265.   
  266.     /* Allocate a new CAS ID on link. */  
  267.     ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);// 调用get_cas_id()给item的cas_id赋值   
  268.   
  269.     // 把该item插入LRU队列(heads, tails, sizes)   
  270.     item_link_q(it);  
  271.   
  272.     return 1;  
  273. }  
  274.   
  275. // 从hashtable及LRU队列摘除item项   
  276. void do_item_unlink(item *it) {  
  277.     MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);  
  278.     if ((it->it_flags & ITEM_LINKED) != 0)   
  279.     {  
  280.         it->it_flags &= ~ITEM_LINKED;  
  281.         STATS_LOCK();  
  282.         stats.curr_bytes -= ITEM_ntotal(it);  
  283.         stats.curr_items -= 1;  
  284.         STATS_UNLOCK();  
  285.         assoc_delete(ITEM_key(it), it->nkey);// 从hashtable中删除key对应的item   
  286.         item_unlink_q(it);                   // 从LRU队列删除此item   
  287.         if (it->refcount == 0)               // 引用计数为1时,删除此item   
  288.             item_free(it);  
  289.     }  
  290. }  
  291. // 减少引用计数refcount,引用计数为0的时候,就将其释放   
  292. void do_item_remove(item *it)   
  293. {  
  294.     MEMCACHED_ITEM_REMOVE(ITEM_key(it), it->nkey, it->nbytes);  
  295.     assert((it->it_flags & ITEM_SLABBED) == 0);  
  296.     if (it->refcount != 0)   
  297.     {  
  298.         it->refcount–;         // 减少引用计数   
  299.         DEBUG_REFCNT(it, ‘-‘);  
  300.     }  
  301.     if (it->refcount == 0 && (it->it_flags & ITEM_LINKED) == 0)   
  302.     {  
  303.         item_free(it);  
  304.     }  
  305. }  
  306. // 更新item时间戳   
  307. // 先调用item_unlink_q(),更新了时间以后,再调用item_link_q(),   
  308. // 将其重新连接到LRU队列之中,即让该item移到LRU队列的最前   
  309. void do_item_update(item *it) {  
  310.     MEMCACHED_ITEM_UPDATE(ITEM_key(it), it->nkey, it->nbytes);  
  311.     if (it->time < current_time – ITEM_UPDATE_INTERVAL) {  
  312.         assert((it->it_flags & ITEM_SLABBED) == 0);  
  313.   
  314.         if ((it->it_flags & ITEM_LINKED) != 0) {  
  315.             item_unlink_q(it);  
  316.             it->time = current_time;  
  317.             item_link_q(it);  
  318.         }  
  319.     }  
  320. }  
  321. // 调用do_item_unlink()解除原有it的连接,再调用do_item_link()连接到新的new_it   
  322. int do_item_replace(item *it, item *new_it) {  
  323.     MEMCACHED_ITEM_REPLACE(ITEM_key(it), it->nkey, it->nbytes,  
  324.                            ITEM_key(new_it), new_it->nkey, new_it->nbytes);  
  325.     assert((it->it_flags & ITEM_SLABBED) == 0);  
  326.   
  327.     do_item_unlink(it);             // 从hashtable及LRU队列摘除it   
  328.     return do_item_link(new_it);    // 将new_it插入hashtable和LRU队列   
  329. }  

 



  1. /** wrapper around assoc_find which does the lazy expiration logic */  
  2. item *do_item_get(const char *key, const size_t nkey) {  
  3.     item *it = assoc_find(key, nkey);   // 从hashtable查找key   
  4.     int was_found = 0;  
  5.   
  6.     if (settings.verbose > 2) {  
  7.         if (it == NULL) {  
  8.             fprintf(stderr, “> NOT FOUND %s”, key);  
  9.         } else {  
  10.             fprintf(stderr, “> FOUND KEY %s”, ITEM_key(it));  
  11.             was_found++;  
  12.         }  
  13.     }  
  14.     // 命中,且item存取时间比截止时间早,已失效   
  15.     if (it != NULL && settings.oldest_live != 0 && settings.oldest_live <= current_time &&  
  16.         it->time <= settings.oldest_live) {  
  17.         do_item_unlink(it);           /* MTSAFE – cache_lock held */  
  18.         it = NULL;  
  19.     }  
  20.   
  21.     if (it == NULL && was_found) {  
  22.         fprintf(stderr, ” -nuked by flush”);  
  23.         was_found–;  
  24.     }  
  25.   
  26.     // 命中,且item已过期   
  27.     if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {  
  28.         do_item_unlink(it);           /* MTSAFE – cache_lock held */  
  29.         it = NULL;  
  30.     }  
  31.   
  32.     if (it == NULL && was_found) {  
  33.         fprintf(stderr, ” -nuked by expire”);  
  34.         was_found–;  
  35.     }  
  36.   
  37.     if (it != NULL) {  
  38.         it->refcount++;  
  39.         DEBUG_REFCNT(it, ‘+’);  
  40.     }  
  41.   
  42.     if (settings.verbose > 2)  
  43.         fprintf(stderr, “\n”);  
  44.   
  45.     return it;  
  46. }  
  47.   
  48. /** returns an item whether or not it’s expired. */  
  49. item *do_item_get_nocheck(const char *key, const size_t nkey) {  
  50.     item *it = assoc_find(key, nkey);  
  51.     if (it) {  
  52.         it->refcount++;  
  53.         DEBUG_REFCNT(it, ‘+’);  
  54.     }  
  55.     return it;  
  56. }  
  57.   
  58. /* expires items that are more recent than the oldest_live setting. */// 执行flush操作,即将所有当前有效的item清空(flush)  
  59.  void do_item_flush_expired(void) {  
  60.     int i;  
  61.     item *iter, *next;  
  62.     if (settings.oldest_live == 0)  
  63.         return;  
  64.     for (i = 0; i < LARGEST_ID; i++) {  
  65.         /* The LRU is sorted in decreasing time order, and an item’s timestamp 
  66.          * is never newer than its last access time, so we only need to walk 
  67.          * back until we hit an item older than the oldest_live time. 
  68.          * The oldest_live checking will auto-expire the remaining items. 
  69.          */  
  70.         for (iter = heads[i]; iter != NULL; iter = next) {  
  71.             if (iter->time >= settings.oldest_live) {     // 比截止时间新的数据   
  72.                 next = iter->next;  
  73.                 if ((iter->it_flags & ITEM_SLABBED) == 0) {  
  74.                     do_item_unlink(iter);  
  75.                 }  
  76.             } else {  <pre name=“code” class=“cpp”>               // 比截止时间旧的数据已经在之前回收了,  

// 由于是按照时间降序排列的(越新越靠前),所以碰到第一个older的数据就停止 /* We’ve hit the first old item. Continue to the next queue. */ break; } } }}


下面的函数都是上层调用的,主要与memcached的协议关系比较紧密,同时保证了临界资源的互斥访问(cache_lock),最好提前熟悉一下memcached的文本协议。


  1. /*  
  2.  * Allocates a new item.  
  3.  */  
  4. // 新分配一个item结构  
  5. item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {  
  6.     item *it;  
  7.     pthread_mutex_lock(&cache_lock);  
  8.   
  9.     // do_item_alloc:新分配一个item结构,首先快速扫描是否存在过期,存在则复用item结构,  
  10.     // 如果没有,就新分配一个,如果分配失败,则使用LRU算法进行回收  
  11.     it = do_item_alloc(key, nkey, flags, exptime, nbytes);  
  12.       
  13.     pthread_mutex_unlock(&cache_lock);  
  14.     return it;  
  15. }  
  16.   
  17. /*  
  18.  * Returns an item if it hasn’t been marked as expired,  
  19.  * lazy-expiring as needed.  
  20.  */  
  21.  // 查询key值的item  
  22. item *item_get(const char *key, const size_t nkey) {  
  23.     item *it;  
  24.     pthread_mutex_lock(&cache_lock);  
  25.     it = do_item_get(key, nkey);      // 查询item,如果成功,这里会增加引用计数  
  26.     pthread_mutex_unlock(&cache_lock);  
  27.     return it;  
  28. }  
  29. /*  
  30.  * Decrements the reference count on an item and adds it to the freelist if  
  31.  * needed.  
  32.  */  
  33. void item_remove(item *item) {  
  34.     pthread_mutex_lock(&cache_lock);  
  35.     do_item_remove(item);           // 减少item的引用计数,如果降为0,则释放  
  36.     pthread_mutex_unlock(&cache_lock);  
  37. }  
  38.   
  39. /*  
  40.  * Links an item into the LRU and hashtable.  
  41.  */  
  42. // 将item放入hashtable和LRU队列  
  43. int item_link(item *item) {  
  44.     int ret;  
  45.   
  46.     pthread_mutex_lock(&cache_lock);  
  47.     ret = do_item_link(item);  
  48.     pthread_mutex_unlock(&cache_lock);  
  49.     return ret;  
  50. }  
  51. /*  
  52.  * Unlinks an item from the LRU and hashtable.  
  53.  */  
  54. // 从hashtable和LRU队列去除item  
  55. void item_unlink(item *item) {  
  56.     pthread_mutex_lock(&cache_lock);  
  57.     do_item_unlink(item);  
  58.     pthread_mutex_unlock(&cache_lock);  
  59. }  
  60.   
  61. /*  
  62.  * Replaces one item with another in the hashtable.  
  63.  * Unprotected by a mutex lock since the core server does not require  
  64.  * it to be thread-safe.  
  65.  */  
  66. int item_replace(item *old_it, item *new_it) {  
  67.     // 替换,首先将old_it从hashtable和LRU删除,再讲new_it加入hashtable和LRU  
  68.     return do_item_replace(old_it, new_it);  
  69. }  
  70.   
  71. /*  
  72.  * Moves an item to the back of the LRU queue.  
  73.  */  
  74. void item_update(item *item) {  
  75.     pthread_mutex_lock(&cache_lock);  
  76.     do_item_update(item);  
  77.     pthread_mutex_unlock(&cache_lock);  
  78. }  
  79.   
  80. /*  
  81.  * Stores an item in the cache (high level, obeys set/add/replace semantics)  
  82.  */  
  83. enum store_item_type store_item(item *item, int comm, conn* c) {  
  84.     enum store_item_type ret;  
  85.   
  86.     pthread_mutex_lock(&cache_lock);  
  87.     ret = do_store_item(item, comm, c);  
  88.     pthread_mutex_unlock(&cache_lock);  
  89.     return ret;  
  90. }  
  91.   
  92. /*  
  93.  * Stores an item in the cache according to the semantics of one of the set  
  94.  * commands. In threaded mode, this is protected by the cache lock.  
  95.  *  
  96.  * Returns the state of storage.  
  97.  */  
  98. enum store_item_type do_store_item(item *it, int comm, conn *c) {  
  99.     char *key = ITEM_key(it);  
  100.     item *old_it = do_item_get(key, it->nkey);  // 查询key值的item  
  101.     enum store_item_type stored = NOT_STORED;  
  102.   
  103.     item *new_it = NULL;  
  104.     int flags;  
  105.   
  106.     if (old_it != NULL && comm == NREAD_ADD) { // 已经存在  
  107.         /* add only adds a nonexistent item, but promote to head of LRU */  
  108.         do_item_update(old_it);  
  109.     } else if (!old_it && (comm == NREAD_REPLACE  
  110.         || comm == NREAD_APPEND || comm == NREAD_PREPEND))  
  111.     {  
  112.         /* replace only replaces an existing value; don’t store */  
  113.     } else if (comm == NREAD_CAS) {  
  114.         /* validate cas operation */  
  115.         if(old_it == NULL) {  
  116.             // LRU expired  
  117.             stored = NOT_FOUND;  
  118.             pthread_mutex_lock(&c->thread->stats.mutex);  
  119.             c->thread->stats.cas_misses++;  
  120.             pthread_mutex_unlock(&c->thread->stats.mutex);  
  121.         }  
  122.         else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {// cas(check and set)一致时,才操作  
  123.             // cas validates  
  124.             // it and old_it may belong to different classes.  
  125.             // I’m updating the stats for the one that’s getting pushed out  
  126.             pthread_mutex_lock(&c->thread->stats.mutex);  
  127.             c->thread->stats.slab_stats[old_it->slabs_clsid].cas_hits++;  
  128.             pthread_mutex_unlock(&c->thread->stats.mutex);  
  129.   
  130.             item_replace(old_it, it);  
  131.             stored = STORED;  
  132.         } else {  
  133.             pthread_mutex_lock(&c->thread->stats.mutex);  
  134.             c->thread->stats.slab_stats[old_it->slabs_clsid].cas_badval++;  
  135.             pthread_mutex_unlock(&c->thread->stats.mutex);  
  136.   
  137.             if(settings.verbose > 1) {  
  138.                 fprintf(stderr, “CAS:  failure: expected %llu, got %llu\n”,  
  139.                         (unsigned long long)ITEM_get_cas(old_it),  
  140.                         (unsigned long long)ITEM_get_cas(it));  
  141.             }  
  142.             stored = EXISTS;  
  143.         }  
  144.     } else {  
  145.         /*  
  146.          * Append – combine new and old record into single one. Here it’s  
  147.          * atomic and thread-safe.  
  148.          */  
  149.         if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {  
  150.             /*  
  151.              * Validate CAS  
  152.              */  
  153.             if (ITEM_get_cas(it) != 0) {  
  154.                 // CAS much be equal  
  155.                 if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {  
  156.                     stored = EXISTS;  
  157.                 }  
  158.             }  
  159.   
  160.             if (stored == NOT_STORED) {  
  161.                 /* we have it and old_it here – alloc memory to hold both */  
  162.                 /* flags was already lost – so recover them from ITEM_suffix(it) */  
  163.   
  164.                 flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);  
  165.   
  166.                 // 新建一个item  
  167.                 new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes – 2 /* CRLF */);  
  168.   
  169.                 if (new_it == NULL) {  
  170.                     /* SERVER_ERROR out of memory */  
  171.                     if (old_it != NULL)  
  172.                         do_item_remove(old_it);  
  173.   
  174.                     return NOT_STORED;  
  175.                 }  
  176.   
  177.                 /* copy data from it and old_it to new_it */  
  178.   
  179.                 if (comm == NREAD_APPEND) {  
  180.                     memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);  
  181.                     memcpy(ITEM_data(new_it) + old_it->nbytes – 2 /* CRLF */, ITEM_data(it), it->nbytes);  
  182.                 } else {  
  183.                     /* NREAD_PREPEND */  
  184.                     memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);  
  185.                     memcpy(ITEM_data(new_it) + it->nbytes – 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);  
  186.                 }  
  187.   
  188.                 it = new_it;  
  189.             }  
  190.         }  
  191.   
  192.         if (stored == NOT_STORED) {  
  193.             if (old_it != NULL)  
  194.                 item_replace(old_it, it);  
  195.             else  
  196.                 do_item_link(it);  
  197.   
  198.             c->cas = ITEM_get_cas(it);  
  199.   
  200.             stored = STORED;  
  201.         }  
  202.     }  
  203.   
  204.     if (old_it != NULL)  
  205.         do_item_remove(old_it);         /* release our reference */  
  206.     if (new_it != NULL)  
  207.         do_item_remove(new_it);  
  208.   
  209.     if (stored == STORED) {  
  210.         c->cas = ITEM_get_cas(it);  
  211.     }  
  212.   
  213.     return stored;  
  214. }  
  215.   
  216. /*  
  217.  * Flushes expired items after a flush_all call  
  218.  */  
  219. // 执行flush_all操作,清空所有items  
  220. void item_flush_expired() {  
  221.     pthread_mutex_lock(&cache_lock);  
  222.     do_item_flush_expired();  
  223.     pthread_mutex_unlock(&cache_lock);  
  224. }   

总结:感觉items相关的操作并没有很高的复用价值,这个主要是理解slab和hashtable的接口及使用方法,设计的算法和数据结构并不多,本身逻辑也并不是很复杂。

memcached守护进程创建函数位于daemon.c中,创建方式同其它守护进程相同,这里只是作为备忘,特此设置转载。


首先引用《Linux 守护进程 Daemon》 (见 http://www.linuxidc.com/Linux/2011-12/49326.htm )中守护进程的创建过程,然后附上memcached的daemon.c源码说明。


Linux 守护进程编程


守护进程最重要的特性是后台运行;其次守护进程必须与其运行前的环境隔离开来,这些环境包括未关闭的文件描述符,控制终端,会话和进程组,工作目录以及文件创建模式等。这些环境通常是守护进程从执行它的父进程(特别是shell)中继承下来的;最后守护进程的启动方式有其特殊之处。它可以在Linux系统启动时从启动脚本/etc/rc.d中启动,可以由作业规划进程crond启动,还可以由用户终端(通常是shell)执行。


总之,除开这些特殊性以外,守护进程与普通进程基本上没有什么区别。因此,编写守护进程实际上是把一个普通进程按照上述的守护进程的特性改造成为守护进程。


编写守护进程的步骤:


(1)在父进程中执行fork并exit推出;


(2)在子进程中调用setsid函数创建新的会话;


(3)在子进程中调用chdir函数,让根目录 ”/” 成为子进程的工作目录;


(4)在子进程中调用umask函数,设置进程的umask为0;


(5)在子进程中关闭任何不需要的文件描述符


setsid函数


#include<unistd.h>


pid_tsetsid(void);


返回值:成功返回新会话的ID;失败返回-1


setsid函数创建一个新会话和新进程组,setsid调用保证新会话没有控制终端。


守护进程调用该函数将成为新会话的会话领导和新进程组的进程组领导。


chdir函数


#include<unistd.h>


intchdir(const char *path);


chdir函数改变当前的工作目录为path所包含的新目录


umask函数


#include


voidumask(int mask);   // r-4; w-2; x-1


umask函数改变目录和文件的创建模式。


下面是memcached中的源码及说明:



  1. // create daemon process    

  2. int daemonize(int nochdir, int noclose)  

  3. {  

  4.     int fd;  

  5.   

  6. // 调用fork产生一个子进程,同时父进程退出。我们所有后续工作都在子进程中完成。   

  7. // 这样做我们可以:如果我们是从命令行执行的该程序,这可以造成程序执行完毕的假象,shell会返回   

  8. // 刚刚通过fork产生的新进程一定不会是一个进程组的组长,这为第2步的执行提供了前提保障。   

  9. // 这样做还会出现一种很有趣的现象:由于父进程已经先于子进程退出,会造成子进程没有父进程,   

  10. // 变成一个孤儿进程(orphan)。每当系统发现一个孤儿进程,就会自动由1号进程收养它,这样   

  11. // 原先的子进程就会变成1号进程的子进程。   

  12.     switch (fork()) {  

  13.     case -1:  

  14.         return (-1);  

  15.     case 0:  

  16.         break;  

  17.     default:  

  18.         _exit(EXIT_SUCCESS);  

  19.     }  

  20. // 调用setsid系统调用。这是整个过程中最重要的一步,   

  21. // 它的作用是创建一个新的会话(session),并自任该会话的组长(session leader)   

  22. // 如果调用进程是一个进程组的组长,调用就会失败,但这已经在第1步得到了保证。   

  23. // 调用setsid有3个作用:    

  24. //   让进程摆脱原会话的控制;    

  25. //   让进程摆脱原进程组的控制;    

  26. //   让进程摆脱原控制终端的控制;    

  27. // 总之,就是让进程完全独立出来,脱离所有其他进程的控制   

  28.     if (setsid() == -1) // setsid – run a program in a new session   

  29.         return (-1);  

  30.   

  31. /* 

  32. 把当前工作目录切换到根目录。如果我们是在一个临时加载的文件系统上执行这个进程的, 

  33. 比如:/mnt/floppy/,该进程的当前工作目录就会是/mnt/floppy/。在整个进程运行期间 

  34. 该文件系统都无法被卸下(umount),而无论我们是否在使用这个文件系统,这会给我们 

  35. 带来很多不便。解决的方法是使用chdir系统调用把当前工作目录变为根目录,应该不会有 

  36. 人想把根目录卸下吧。 

  37. 当然,在这一步里,如果有特殊的需要,我们也可以把当前工作目录换成其他的路径,比如/tmp 

  38. */  

  39.     if (nochdir == 0) {  

  40.         if(chdir(“/”) != 0) {  

  41.             perror(“chdir”);  

  42.             return (-1);  

  43.         }  

  44.     }  

  45. /* 

  46. 将文件权限掩码设为0。这需要调用系统调用umask,每个进程都会从父进程那里继承 

  47. 一个文件权限掩码,当创建新文件时,这个掩码被用于设定文件的默认访问权限,屏蔽掉某些权限, 

  48. 如一般用户的写权限。当另一个进程用exec调用我们编写的daemon程序时,由于我们不知道那个进程 

  49. 的文件权限掩码是什么,这样在我们创建新文件时,就会带来一些麻烦。所以,我们应该重新设置文 

  50. 件权限掩码,我们可以设成任何我们想要的值,但一般情况下,大家都把它设为0,这样,它就不会 

  51. 屏蔽用户的任何操作。 

  52. 如果你的应用程序根本就不涉及创建新文件或是文件访问权限的设定,你也完全可以把文件权限 

  53. 掩码一脚踢开,跳过这一步。关闭所有不需要的文件。同文件权限掩码一样,我们的新进程会从父 

  54. 进程那里继承一些已经打开了的文件。这些被打开的文件可能永远不被我们的daemon进程读或写, 

  55. 但它们一样消耗系统资源,而且可能导致所在的文件系统无法卸下。需要指出的是,文件描述符为0、1和2 

  56. 的三个文件,也就是我们常说的输入、输出和报错这三个文件也需要被关闭。 

  57. 很可能不少读者会对此感到奇怪,难道我们不需要输入输出吗?但事实是,在上面的第2步 

  58. 后,我们的daemon进程已经与所属的控制终端失去了联系,我们从终端输入的字符不可能达到daemon进程, 

  59. daemon进程用常规的方法(如printf)输出的字符也不可能在我们的终端上显示出来。所以这三个文件已经 

  60. 失去了存在的价值,也应该被关闭。 

  61. */  

  62.   

  63.     // 设置文件权限掩码   

  64.     // umask(0) ; // 这里原本没有,只是为了上面的基本流程说明加上的   

  65.   

  66.     if (noclose == 0 && (fd = open(“/dev/null”, O_RDWR, 0)) != -1) {  

  67.         if(dup2(fd, STDIN_FILENO) < 0) {        // 重定向, dup, dup2 – duplicate a file descriptor   

  68.             perror(“dup2 stdin”);               // int dup2(int oldfd, int newfd);   

  69.                                                 // dup2() makes newfd be the copy of oldfd, closing newfd first if necessary   

  70.             return (-1);  

  71.         }  

  72.         if(dup2(fd, STDOUT_FILENO) < 0) {  

  73.             perror(“dup2 stdout”);  

  74.             return (-1);  

  75.         }  

  76.         if(dup2(fd, STDERR_FILENO) < 0) {  

  77.             perror(“dup2 stderr”);  

  78.             return (-1);  

  79.         }  

  80.   

  81.         if (fd > STDERR_FILENO) {  

  82.             if(close(fd) < 0) {  

  83.                 perror(“close”);  

  84.                 return (-1);  

  85.             }  

  86.         }  

  87.     }  

  88.     return (0);  

  89. }  

研究memcached源码有段日子了,前面几篇文章把memcached主要模块进行了介绍,主要有:


memcached源码学习-内存管理机制slab allocator http://www.linuxidc.com/Linux/2012-01/52515.htm


memcached源码学习-hashtable http://www.linuxidc.com/Linux/2012-01/52515p2.htm


memcached源码学习-多线程模型 http://www.linuxidc.com/Linux/2012-01/52515p3.htm


memcached源码学习-items操作 http://www.linuxidc.com/Linux/2012-01/52515p4.htm


memcached源码学习-daemon进程 http://www.linuxidc.com/Linux/2012-01/52515p5.htm


memcached模块化和复用比较好的模块为slab内存分配机制、hashtable、多线程模型和daemon(这个直接拿去用吧,其它开源软件的也一样),items相关的操作分布较乱,且与memcached协议具有一定的关联,重用几率很小,难道这也是前几个模块划分非常好的原因!


memcached的事件模型这里没有介绍,它采用了libevent模型,这个可以通过学习libevent来了解,后续我会再去学习libevent的源码,并与大家分享。


memcached源码得研究工作到此暂告一段落,以后在实际工作项目中,可以根据需求对上述模块进行修改并复用,这就是一笔前辈留下的财富。

赞(0) 打赏
转载请注明出处:服务器评测 » Memcached源码学习
分享到: 更多 (0)

听说打赏我的人,都进福布斯排行榜啦!

支付宝扫一扫打赏

微信扫一扫打赏