Redis 数据结构

简单动态字符串SDS


Redis没有直接使用传统字符串表示,而是构建一种名为简单动态字符串的抽象类型,并将SDS用作Redis默认字符串表示。

SDS定义如下:

struct sdshdr{

    //记录buf数组中已使用字节的数量
    //	等于SDS所保存字符串的长度
    int len;
    //记录buf数组中未使用字节数量;
    int free;
    //字节数组,用于保存字符串;
    char buf[];

}

SDS与C字符串区别:

  1. 常数复杂度获取字符串长度
  2. 杜绝缓冲区溢出
  3. 减少修改字符串时带来的内存重分配次数
  4. 二进制安全
  5. 兼容部分C字符串函数

链表


链表节点定义:

typedef struct listNode {

    // 前置节点
    struct listNode *prev;

    // 后置节点
    struct listNode *next;

    // 节点的值
    void *value;

} listNode;

链表节点定义:

typedef struct list {

    // 表头节点
    listNode *head;

    // 表尾节点
    listNode *tail;

    // 链表所包含的节点数量
    unsigned long len;

    // 节点值复制函数
    void *(*dup)(void *ptr);

    // 节点值释放函数
    void (*free)(void *ptr);

    // 节点值对比函数
    int (*match)(void *ptr, void *key);

} list;

Redis 的链表实现的特性可以总结如下:

  • 双端: 链表节点带有 prev 和 next 指针, 获取某个节点的前置节点和后置节点的复杂度都是 O(1) 。
  • 无环: 表头节点的 prev 指针和表尾节点的 next 指针都指向 NULL , 对链表的访问以 NULL 为终点。
  • 带表头指针和表尾指针: 通过 list 结构的 head 指针和 tail 指针, 程序获取链表的表头节点和表尾节点的复杂度为 O(1) 。
  • 带链表长度计数器: 程序使用 list 结构的 len 属性来对 list 持有的链表节点进行计数, 程序获取链表中节点数量的复杂度为 O(1)。
  • 多态: 链表节点使用 void* 指针来保存节点值, 并且可以通过 list 结构的 dup 、 free 、 match 三个属性为节点值设置类型特定函数, 所以链表可以用于保存各种不同类型的值。

字典


Redis 的字典使用哈希表作为底层实现, 一个哈希表里面可以有多个哈希表节点, 而每个哈希表节点就保存了字典中的一个键值对。

哈希表节点

哈希表节点使用 dictEntry 结构表示, 每个 dictEntry 结构都保存着一个键值对:
typedef struct dictEntry {

    // 键
    void *key;

    // 值
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
    } v;

    // 指向下个哈希表节点,形成链表
    struct dictEntry *next;

} dictEntry;

key 属性保存着键值对中的键, 而 v 属性则保存着键值对中的值, 其中键值对的值可以是一个指针, 或者是一个 uint64_t 整数, 又或者是一个 int64_t 整数。

next 属性是指向另一个哈希表节点的指针, 这个指针可以将多个哈希值相同的键值对连接在一次, 以此来解决键冲突(collision)的问题。

哈希表

typedef struct dictht {

    // 哈希表数组
    dictEntry **table;

    // 哈希表大小
    unsigned long size;

    // 哈希表大小掩码,用于计算索引值
    // 总是等于 size - 1
    unsigned long sizemask;

    // 该哈希表已有节点的数量
    unsigned long used;

} dictht;

table 属性是一个数组, 数组中的每个元素都是一个指向 dict.h/dictEntry 结构的指针, 每个 dictEntry 结构保存着一个键值对。

size 属性记录了哈希表的大小, 也即是 table 数组的大小, 而 used 属性则记录了哈希表目前已有节点(键值对)的数量。

sizemask 属性的值总是等于 size - 1 , 这个属性和哈希值一起决定一个键应该被放到 table 数组的哪个索引上面。

一个大小为 4 的空哈希表 (没有包含任何键值对)

字典

typedef struct dict {

    // 类型特定函数
    dictType *type;

    // 私有数据
    void *privdata;

    // 哈希表
    dictht ht[2];

    // rehash 索引
    // 当 rehash 不在进行时,值为 -1
    int rehashidx; /* rehashing not in progress if rehashidx == -1 */

} dict;

使用链地址头插法发来解决键冲突

Redis rehash

随着操作的不断执行, 哈希表保存的键值对会逐渐地增多或者减少, 为了让哈希表的负载因子(load factor)维持在一个合理的范围之内, 当哈希表保存的键值对数量太多或者太少时, 程序需要对哈希表的大小进行相应的扩展或者收缩。

扩展和收缩哈希表的工作可以通过执行 rehash (重新散列)操作来完成, Redis 对字典的哈希表执行 rehash 的步骤如下:

  1. 为字典的 ht[1] 哈希表分配空间, 这个哈希表的空间大小取决于要执行的操作, 以及 ht[0] 当前包含的键值对数量 (也即是ht[0].used 属性的值):
    • 如果执行的是扩展操作, 那么 ht[1] 的大小为第一个大于等于 ht[0].used * 2 的 2^n (2 的 n 次方幂);
    • 如果执行的是收缩操作, 那么 ht[1] 的大小为第一个大于等于 ht[0].used 的 2^n 。
  2. 将保存在 ht[0] 中的所有键值对 rehash 到 ht[1] 上面: rehash 指的是重新计算键的哈希值和索引值, 然后将键值对放置到 ht[1] 哈希表的指定位置上。
  3. 当 ht[0] 包含的所有键值对都迁移到了 ht[1] 之后 (ht[0] 变为空表), 释放 ht[0] , 将 ht[1] 设置为 ht[0] , 并在 ht[1] 新创建一个空白哈希表, 为下一次 rehash 做准备。

Redis 渐进式reshash

因此, 为了避免 rehash 对服务器性能造成影响, 服务器不是一次性将 ht[0] 里面的所有键值对全部 rehash 到 ht[1] , 而是分多次、渐进式地将 ht[0] 里面的键值对慢慢地 rehash 到 ht[1] 。

以下是哈希表渐进式 rehash 的详细步骤:

  1. 为 ht[1] 分配空间, 让字典同时持有 ht[0] 和 ht[1] 两个哈希表。
  2. 在字典中维持一个索引计数器变量 rehashidx , 并将它的值设置为 0 , 表示 rehash 工作正式开始。
  3. 在 rehash 进行期间, 每次对字典执行添加、删除、查找或者更新操作时, 程序除了执行指定的操作以外, 还会顺带将 ht[0] 哈希表在 rehashidx 索引上的所有键值对 rehash 到 ht[1] , 当 rehash 工作完成之后, 程序将 rehashidx 属性的值增一。
  4. 随着字典操作的不断执行, 最终在某个时间点上, ht[0] 的所有键值对都会被 rehash 至 ht[1] , 这时程序将 rehashidx 属性的值设为 -1 , 表示 rehash 操作已完成。

渐进式 rehash 的好处在于它采取分而治之的方式, 将 rehash 键值对所需的计算工作均滩到对字典的每个添加、删除、查找和更新操作上, 从而避免了集中式 rehash 而带来的庞大计算量。

哈希表的扩展与收缩

当以下条件中的任意一个被满足时, 程序会自动开始对哈希表执行扩展操作:

  1. 服务器目前没有在执行 BGSAVE 命令或者 BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 1 ;

  2. 服务器目前正在执行 BGSAVE 命令或者 BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 5 ;

     # 负载因子 = 哈希表已保存节点数量 / 哈希表大小
     load_factor = ht[0].used / ht[0].size
    

跳跃表


Redis 的跳跃表由 redis.h/zskiplistNode 和 redis.h/zskiplist 两个结构定义, 其中 zskiplistNode 结构用于表示跳跃表节点, 而 zskiplist结构则用于保存跳跃表节点的相关信息, 比如节点的数量, 以及指向表头节点和表尾节点的指针, 等等。

展示了一个跳跃表示例, 位于图片最左边的是 zskiplist 结构, 该结构包含以下属性:

  • header :指向跳跃表的表头节点。
  • tail :指向跳跃表的表尾节点。
  • level :记录目前跳跃表内,层数最大的那个节点的层数(表头节点的层数不计算在内)。
  • length :记录跳跃表的长度,也即是,跳跃表目前包含节点的数量(表头节点不计算在内)。

位于 zskiplist 结构右方的是四个 zskiplistNode 结构, 该结构包含以下属性:

  • 层(level):节点中用 L1 、 L2 、 L3 等字样标记节点的各个层, L1 代表第一层, L2 代表第二层,以此类推。每个层都带有两个属性:前进指针和跨度。前进指针用于访问位于表尾方向的其他节点,而跨度则记录了前进指针所指向节点和当前节点的距离。在上面的图片中,连线上带有数字的箭头就代表前进指针,而那个数字就是跨度。当程序从表头向表尾进行遍历时,访问会沿着层的前进指针进行。
  • 后退(backward)指针:节点中用 BW 字样标记节点的后退指针,它指向位于当前节点的前一个节点。后退指针在程序从表尾向表头遍历时使用。
  • 分值(score):各个节点中的 1.0 、 2.0 和 3.0 是节点所保存的分值。在跳跃表中,节点按各自所保存的分值从小到大排列。
  • 成员对象(obj):各个节点中的 o1 、 o2 和 o3 是节点所保存的成员对象。

跳跃表节点

跳跃表节点的实现由 redis.h/zskiplistNode 结构定义:

typedef struct zskiplistNode {

    // 后退指针
    struct zskiplistNode *backward;

    // 分值
    double score;

    // 成员对象
    robj *obj;

    // 层
    struct zskiplistLevel {

        // 前进指针
        struct zskiplistNode *forward;

        // 跨度
        unsigned int span;

    } level[];

} zskiplistNode;

与红黑树等平衡树相比,跳跃表具有以下优点:

  • 插入速度非常快速,因为不需要进行旋转等操作来维护平衡性;
  • 更容易实现;
  • 支持无锁操作。

整数集合


整数集合(intset)是 Redis 用于保存整数值的集合抽象数据结构, 它可以保存类型为 int16_t 、 int32_t 或者 int64_t 的整数值, 并且保证集合中不会出现重复元素。

每个 intset.h/intset 结构表示一个整数集合:

typedef struct intset {

    // 编码方式
    uint32_t encoding;

    // 集合包含的元素数量
    uint32_t length;

    // 保存元素的数组
    int8_t contents[];

} intset;

contents 数组是整数集合的底层实现: 整数集合的每个元素都是 contents 数组的一个数组项(item), 各个项在数组中按值的大小从小到大有序地排列, 并且数组中不包含任何重复项。

length 属性记录了整数集合包含的元素数量, 也即是 contents 数组的长度。

虽然 intset 结构将 contents 属性声明为 int8_t 类型的数组, 但实际上 contents 数组并不保存任何 int8_t 类型的值 —— contents 数组的真正类型取决于 encoding 属性的值。

升级

每当我们要将一个新元素添加到整数集合里面, 并且新元素的类型比整数集合现有所有元素的类型都要长时, 整数集合需要先进行升级(upgrade), 然后才能将新元素添加到整数集合里面。

升级整数集合并添加新元素共分为三步进行:

  • 根据新元素的类型, 扩展整数集合底层数组的空间大小, 并为新元素分配空间。
  • 将底层数组现有的所有元素都转换成与新元素相同的类型, 并将类型转换后的元素放置到正确的位上, 而且在放置元素的过程中, 需要继续维持底层数组的有序性质不变。
  • 将新元素添加到底层数组里面。

升级的好处:

  • 提升灵活性:

    因为 C 语言是静态类型语言, 为了避免类型错误, 我们通常不会将两种不同类型的值放在同一个数据结构里面。 因为整数集合可以通过自动升级底层数组来适应新元素, 所以我们可以随意地将 int16_t 、 int32_t 或者 int64_t 类型的整数添加到集合中, 而不必担心出现类型错误, 这种做法非常灵活。

  • 节约内存

    整数集合现在的做法既可以让集合能同时保存三种不同类型的值, 又可以确保升级操作只会在有需要的时候进行, 这可以尽量节省内存。

降级

整数集合不支持降级操作, 一旦对数组进行了升级, 编码就会一直保持升级后的状态。

压缩列表

压缩列表是 Redis 为了节约内存而开发的, 由一系列特殊编码的连续内存块组成的顺序型(sequential)数据结构。

一个压缩列表可以包含任意多个节点(entry), 每个节点可以保存一个字节数组或者一个整数值。

图 7-1 展示了压缩列表的各个组成部分, 表 7-1 则记录了各个组成部分的类型、长度、以及用途。

压缩列表节点的构成

每个压缩列表节点都由 previous_entry_length 、 encoding 、 content 三个部分组成。

previous_entry_length

节点的 previous_entry_length 属性以字节为单位, 记录了压缩列表中前一个节点的长度。
previous_entry_length 属性的长度可以是 1 字节或者 5 字节:

  • 如果前一节点的长度小于 254 字节, 那么 previous_entry_length 属性的长度为 1 字节: 前一节点的长度就保存在这一个字节里面。
  • 如果前一节点的长度大于等于 254 字节, 那么 previous_entry_length 属性的长度为 5 字节: 其中属性的第一字节会被设置为 0xFE(十进制值 254), 而之后的四个字节则用于保存前一节点的长度。

encoding

节点的 encoding 属性记录了节点的 content 属性所保存数据的类型以及长度:

  • 一字节、两字节或者五字节长, 值的最高位为 00 、 01 或者 10 的是字节数组编码: 这种编码表示节点的 content 属性保存着字节数组, 数组的长度由编码除去最高两位之后的其他位记录;
  • 一字节长, 值的最高位以 11 开头的是整数编码: 这种编码表示节点的 content 属性保存着整数值, 整数值的类型和长度由编码除去最高两位之后的其他位记录;

content

节点的 content 属性负责保存节点的值, 节点值可以是一个字节数组或者整数, 值的类型和长度由节点的 encoding 属性决定。

其中,字节数组可以是以下三种长度的其中一种:

  • 长度小于等于 63 (2^{6}-1)字节的字节数组;
  • 长度小于等于 16383 (2^{14}-1) 字节的字节数组;
  • 长度小于等于 4294967295 (2^{32}-1)字节的字节数组;

而整数值则可以是以下六种长度的其中一种:

  • 4 位长,介于 0 至 12 之间的无符号整数;
  • 1 字节长的有符号整数;
  • 3 字节长的有符号整数;
  • int16_t 类型整数;
  • int32_t 类型整数;
  • int64_t 类型整数。

连锁更新

在一个压缩列表中, 有多个连续的、长度介于 250 字节到 253 字节之间的节点 e1 至 eN。

因为 e1 至 eN 的所有节点的长度都小于 254 字节, 所以记录这些节点的长度只需要 1 字节长的 previous_entry_length 属性, 换句话说,e1 至 eN 的所有节点的 previous_entry_length 属性都是 1 字节长的。

如果我们将一个长度大于等于 254 字节的新节点 new 设置为压缩列表的表头节点, 那么 new 将成为 e1 的前置节点。

因为 e1 的 previous_entry_length 属性仅长 1 字节, 它没办法保存新节点 new 的长度, 所以程序将对压缩列表执行空间重分配操作, 并将e1 节点的 previous_entry_length 属性从原来的 1 字节长扩展为 5 字节长。

现在, 麻烦的事情来了 —— e1 原本的长度介于 250 字节至 253 字节之间, 在为 previous_entry_length 属性新增四个字节的空间之后, e1的长度就变成了介于 254 字节至 257 字节之间, 而这种长度使用 1 字节长的 previous_entry_length 属性是没办法保存的。

因此, 为了让 e2 的 previous_entry_length 属性可以记录下 e1 的长度, 程序需要再次对压缩列表执行空间重分配操作, 并将 e2 节点的previous_entry_length 属性从原来的 1 字节长扩展为 5 字节长。

正如扩展 e1 引发了对 e2 的扩展一样, 扩展 e2 也会引发对 e3 的扩展, 而扩展 e3 又会引发对 e4 的扩展……为了让每个节点的previous_entry_length 属性都符合压缩列表对节点的要求, 程序需要不断地对压缩列表执行空间重分配操作, 直到 eN 为止。

Redis 将这种在特殊情况下产生的连续多次空间扩展操作称之为“连锁更新”(cascade update)。

对象


Redis 使用对象来表示数据库中的键和值, 每次当我们在 Redis 的数据库中新创建一个键值对时, 我们至少会创建两个对象, 一个对象用作键值对的键(键对象), 另一个对象用作键值对的值(值对象)。

typedef struct redisObject {

    // 类型
    unsigned type:4;

    // 编码
    unsigned encoding:4;

    // 指向底层实现数据结构的指针
    void *ptr;

    // ...

} robj;

类型

对象的 type 属性记录了对象的类型

类型常量 对象的名称
REDIS_STRING 字符串对象
REDIS_LIST 列表对象
REDIS_HASH 哈希对象
REDIS_SET 集合对象
REDIS_ZSET 有序集合对象

编码

对象的 ptr 指针指向对象的底层实现数据结构, 而这些数据结构由对象的 encoding 属性决定。通过 encoding 属性来设定对象所使用的编码, 而不是为特定类型的对象关联一种固定的编码, 极大地提升了 Redis 的灵活性和效率, 因为 Redis 可以根据不同的使用场景来为一个对象设置不同的编码, 从而优化对象在某一场景下的效率。

编码常量 编码所对应的底层数据结构
REDIS_ENCODING_INT long 类型的整数
REDIS_ENCODING_EMBSTR embstr 编码的简单动态字符串
REDIS_ENCODING_RAW 简单动态字符串
REDIS_ENCODING_HT 字典
REDIS_ENCODING_LINKEDLIST 双端链表
REDIS_ENCODING_ZIPLIST 压缩列表
REDIS_ENCODING_INTSET 整数集合
REDIS_ENCODING_SKIPLIST 跳跃表和字典
对象所使用的底层数据结构 编码常量 OBJECT ENCODING 命令输出
整数 REDIS_ENCODING_INT “int”
embstr 编码的简单动态字符串(SDS) REDIS_ENCODING_EMBSTR “embstr”
简单动态字符串 REDIS_ENCODING_RAW “raw”
字典 REDIS_ENCODING_HT “hashtable”
双端链表 REDIS_ENCODING_LINKEDLIST “linkedlist”
压缩列表 REDIS_ENCODING_ZIPLIST “ziplist”
整数集合 REDIS_ENCODING_INTSET “intset”
跳跃表和字典 REDIS_ENCODING_SKIPLIST “skiplist”

字符串对象

字符串对象的编码可以是 int 、 raw 或者 embstr 。

列表对象

列表对象的编码可以是 ziplist 或者 linkedlist 。

当列表对象可以同时满足以下两个条件时, 列表对象使用 ziplist 编码:

  • 列表对象保存的所有字符串元素的长度都小于 64 字节;
  • 列表对象保存的元素数量小于 512 个;

不能满足这两个条件的列表对象需要使用 linkedlist 编码。

注意以上两个条件的上限值是可以修改的, 具体请看配置文件中关于 list-max-ziplist-value 选项和 list-max-ziplist-entries 选项的说明。

哈希对象

哈希对象的编码可以是 ziplist 或者 hashtable 。

ziplist 编码的哈希对象使用压缩列表作为底层实现, 每当有新的键值对要加入到哈希对象时, 程序会先将保存了键的压缩列表节点推入到压缩列表表尾, 然后再将保存了值的压缩列表节点推入到压缩列表表尾, 因此:

  • 保存了同一键值对的两个节点总是紧挨在一起, 保存键的节点在前, 保存值的节点在后;
  • 先添加到哈希对象中的键值对会被放在压缩列表的表头方向, 而后来添加到哈希对象中的键值对会被放在压缩列表的表尾方向。

当哈希对象可以同时满足以下两个条件时, 哈希对象使用 ziplist 编码:

  • 哈希对象保存的所有键值对的键和值的字符串长度都小于 64 字节;
  • 哈希对象保存的键值对数量小于 512 个;

不能满足这两个条件的哈希对象需要使用 hashtable 编码。

注意这两个条件的上限值是可以修改的, 具体请看配置文件中关于 hash-max-ziplist-value 选项和 hash-max-ziplist-entries 选项的说明。

集合对象

集合对象的编码可以是 intset 或者 hashtable 。

hashtable 编码的集合对象使用字典作为底层实现, 字典的每个键都是一个字符串对象, 每个字符串对象包含了一个集合元素, 而字典的值则全部被设置为 NULL 。

当集合对象可以同时满足以下两个条件时, 对象使用 intset 编码:

  • 集合对象保存的所有元素都是整数值;
  • 集合对象保存的元素数量不超过 512 个;

不能满足这两个条件的集合对象需要使用 hashtable 编码。

有序集合对象

有序集合的编码可以是 ziplist 或者 skiplist 。

ziplist 编码的有序集合对象使用压缩列表作为底层实现, 每个集合元素使用两个紧挨在一起的压缩列表节点来保存, 第一个节点保存元素的成员(member), 而第二个元素则保存元素的分值(score)。压缩列表内的集合元素按分值从小到大进行排序, 分值较小的元素被放置在靠近表头的方向, 而分值较大的元素则被放置在靠近表尾的方向。

zset 结构中的 zsl 跳跃表按分值从小到大保存了所有集合元素, 每个跳跃表节点都保存了一个集合元素: 跳跃表节点的 object 属性保存了元素的成员, 而跳跃表节点的 score 属性则保存了元素的分值。

Redis 选择了同时使用字典和跳跃表两种数据结构来实现有序集合。这两种数据结构都会通过指针来共享相同元素的成员和分值, 所以同时使用跳跃表和字典来保存集合元素不会产生任何重复成员或者分值, 也不会因此而浪费额外的内存。

当有序集合对象可以同时满足以下两个条件时, 对象使用 ziplist 编码:

  • 有序集合保存的元素数量小于 128 个;
  • 有序集合保存的所有元素成员的长度都小于 64 字节;

不能满足以上两个条件的有序集合对象将使用 skiplist 编码。

Redis数据库实现

数据库


Redis数据库服务器将所有数据库保存在服务器状态redis.h/redisServer结构的db数据汇总

struct redisServer{
//...
// 保存服务器中所有数据库 数组
redisDb *db;
//	服务器数据库数量,默认为16
int dbnum;

};

Redis 是一个键值对(key-value pair)数据库服务器, 服务器中的每个数据库都由一个 redis.h/redisDb 结构表示, 其中, redisDb 结构的dict 字典保存了数据库中的所有键值对, 我们将这个字典称为键空间(key space):

typedef struct redisDb {

    // ...

    // 数据库键空间,保存着数据库中的所有键值对
    dict *dict;

    // ...

} redisDb;

键的生存时间

  • EXPIRE key ttl
  • PEXPIRE key ttl
  • EXPIREAT key timestamp
  • PEXPIREAT key timestamp

过期键删除策略

  • 定时删除
  • 惰性删除
  • 定期删除

redis服务器实际使用惰性删除和定期删除相结合来删除过期键。

惰性删除:对输入键进行检查,如果过期就删除键。

定期删除:redis服务器周期性执行activeExpireCycle函数,随机检查数据库中的键过期时间,并删除过期键。

RDB持久化


AOF持久化


  AOF 则以协议文本的方式,将所有对数据库进行过写入的命令(及其参数)记录到 AOF
文件,以此达到记录数据库状态的目的。

AOF持久化实现

命令追加

  在AOF持久化功能处于打开状态时,服务器在执行完一个写命令后,或以协议格式将被执行的写命令追加到服务器的aof——buf缓冲区末尾;

    struct redisServer{
        //....
        //aof缓冲区
        sds aod_buf;
    }
AOF文件的写入与同步

  因为服务器在处理文件事件时可能会执行写命令, 使得一些内容被追加到 aof_buf 缓冲区里面, 所以在服务器每次结束一个事件循环之前, 它都会调用 flushAppendOnlyFile 函数, 考虑是否需要将 aof_buf 缓冲区中的内容写入和保存到 AOF 文件里面, 这个过程可以用以下伪代码表示:

def eventLoop():

    while True:

        # 处理文件事件,接收命令请求以及发送命令回复
        # 处理命令请求时可能会有新内容被追加到 aof_buf 缓冲区中
        processFileEvents()

        # 处理时间事件
        processTimeEvents()

        # 考虑是否要将 aof_buf 中的内容写入和保存到 AOF 文件里面
        flushAppendOnlyFile()

  flushAppendOnlyFile 函数的行为由服务器配置的 appendfsync 选项的值来决定, 各个不同值产生的行为如表 TABLE_APPENDFSYNC 所示。

  如果用户没有主动为 appendfsync 选项设置值, 那么 appendfsync 选项的默认值为 everysec , 关于 appendfsync 选项的更多信息, 请参考 Redis 项目附带的示例配置文件 redis.conf 。

AOF文件载入与数据还原

因为AOF文件包含重建数据库状态的所有写命令,所以服务器秩序重新执行AOF文件里保存的写命令。

Redis读取AOF文件并还原数据库状态如下:

  1. 创建一个不带网络连接的为客户端
  2. 从AOF文件分析并读取一条写命令
  3. 使用客户端执行这条写命令
  4. 重复2,3直到AOF文件所有写命令被处理

AOF重写

  因为AOF持久化是通过保存被执行的写命令来记录数据库状态,所以随着服务器运行时间流逝,AOF文件中内容会越来越多,使用AOF文件来进行数据还原所需的时间越多。实际上AOF文件重写是通过读取服务器当前数据库状态来实现的,而不是对现有的AOF文件进行读取分析写入。这样能将对一个键的多个写命令替换为一个写命令。

  因为AOF重写函数会进行大量的写入操作,如果调用这个函数会长时间阻塞,所以redis将aof重写程序放入到子进程执行。使用子进程同时会带来一个问题:在子进程进行AOF重写期间服务器继续处理了写请求,会导致服务器当前状态与重写后的AOF文件保存的数据库状态不一致。

  为了解决这个问题,redis服务器设置了一个AOF重写缓冲区,当redis服务器在重写aof阶段,执行完一个写命令会同时将这个写命令发送给AOF缓冲区和AOF重写缓冲区。在子进程完成AOF重写工作后,父进程会调用程序将AOF重写缓冲区中内容写入到新AOF文件中。

Redis设计与实现


线程池的基本使用

java中实现线程池的类为java.uitl.concurrent.ThreadPoolExecutor继承了AbstractExecutorService类。
AbstractExecutorService类基本方法:

AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。
ExecutorService接口基本方法:

ExecutorService又是继承了Executor接口。我们看一下Executor接口的实现:

public interface Executor {
    void execute(Runnable command);
}


线程池的创建

ThreadPoolExecutor的四个构造函数:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

每个参数的含义:

  • corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线
    程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任
    务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,
    线程池会提前创建并启动所有基本线程。

  • runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几
    个阻塞队列。

    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原
      则对元素进行排序。
    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通
      常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用
      移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工
      厂方法Executors.newCachedThreadPool使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
  • maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并
    且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如
    果使用了无界的任务队列这个参数就没什么效果。

  • ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设
    置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线
    程设置有意义的名字,代码如下。

      new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
    
  • RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状
    态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法
    处理新任务时抛出异常。在JDK 1.5中Java线程池框架提供了以下4种策略。

    • AbortPolicy:直接抛出异常。
    • CallerRunsPolicy:只用调用者所在线程来运行任务
    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    • DiscardPolicy:不处理,丢弃掉。
  • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以,
    如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。

  • TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟
    (MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。

在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:

  • Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小Integer.MAX_VALUE

  • Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池

  • Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池

从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

//newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
//newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
//newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

线程的初始化、提交任务、容量的动态调整和关闭线程池

默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

  • prestartCoreThread():初始化一个核心线程;
  • prestartAllCoreThreads():初始化所有核心线程

向线程池提交任务有两个方法:execute和submit。

  • execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现。

  • submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个
    future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方
    法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线
    程一段时间后立即返回,这时候有可能任务没有执行完。submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写。

ThreadPoolExecutor提供了动态调整线程池容量大小的方法:

  • setCorePoolSize()设置核心池大小
  • setMaximumPoolSize(),设置线程池最大能创建的线程数目大小

 当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。
关闭线程池有shutdown和shutdownNow两个方法。

简单使用范例:

public class Test {
     public static void main(String[] args) {   
         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                 new ArrayBlockingQueue<Runnable>(5));
          
         for(int i=0;i<15;i++){
             MyTask myTask = new MyTask(i);
             executor.execute(myTask);
             System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
             executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
         }
         executor.shutdown();
     }
}
 
 
class MyTask implements Runnable {
    private int taskNum;
     
    public MyTask(int num) {
        this.taskNum = num;
    }
     
    @Override
    public void run() {
        System.out.println("正在执行task "+taskNum);
        try {
            Thread.currentThread().sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task "+taskNum+"执行完毕");
    }
}

线程池原理分析

当提交一个新任务到线程池时,线程池的处理流程如下。

  • 1)线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作
    线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。

  • 2)线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这
    个工作队列里。如果工作队列满了,则进入下个流程。

  • 3)线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程
    来执行任务。如果已经满了,则交给饱和策略来处理这个任务

看一下execute的源码:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 如果线程数小于基本线程数,则进入addIfUnderCorePoolSize(command),试图创建线程并执行,当正常是add..返回true,结束,无法执行返回false。
    //如果线程数大于基本线程数,进入if或者上面的add返回false
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        // 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
    }
    
    // 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,
    // 则创建一个线程执行任务。
    else if (!addIfUnderMaximumPoolSize(command))
        // 抛出RejectedExecutionException异常
        reject(command); // is shutdown or saturated
    }
}

//当线程数低于核心池大小时执行的方法
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < corePoolSize && runState == RUNNING)
            t = addThread(firstTask);        //创建线程去执行firstTask任务   
        } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

//用提交的任务创建了一个Worker对象,然后调用线程工厂threadFactory创建了一个新的线程t,
//然后将线程t的引用赋值给了Worker对象的成员变量thread,
//接着通过workers.add(w)将Worker对象添加到工作集当中。
private Thread addThread(Runnable firstTask) {
    Worker w = new Worker(firstTask);
    Thread t = threadFactory.newThread(w);  //创建一个线程,执行任务   
    if (t != null) {
        w.thread = t;            //将创建的线程的引用赋值为w的成员变量       
        workers.add(w);
        int nt = ++poolSize;     //当前线程数加1       
        if (nt > largestPoolSize)
            largestPoolSize = nt;
    }
    return t;
}

//work类的run方法,
//首先执行的是通过构造器传进来的任务firstTask,
//在调用runTask()执行完firstTask之后,在while循环里面不断通过getTask()去取新的任务来执行,那么去哪里取呢?自然是从任务缓存队列里面去取
public void run() {
    try {
        Runnable task = firstTask;
        firstTask = null;
        while (task != null || (task = getTask()) != null) {
            runTask(task);
            task = null;
        }
    } finally {
        workerDone(this);
    }
}

//getTask是ThreadPoolExecutor类中的方法,并不是Worker类中的方法,下面是getTask方法的实现
//在getTask中,先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null
 //如果runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。

//如果当前线程池的线程数大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。

//然后判断取到的任务r是否为null,为null则通过调用workerCanExit()方法来判断当前worker是否可以退出,
Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间,
                //则通过poll取任务,若等待一定的时间取不到任务,则返回null
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {    //如果没取到任务,即r为null,则判断当前的worker是否可以退出
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();   //中断处于空闲状态的worker
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}
//workerCanExit()的实现
//如果线程池处于STOP状态、或者任务队列已为空或者允许为核心池线程设置空闲存活时间并且线程数大于1时,允许worker退出。如果允许worker退出,则调用interruptIdleWorkers()中断处于空闲状态的worker

private boolean workerCanExit() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean canExit;
    //如果runState大于等于STOP,或者任务缓存队列为空了
    //或者  允许为核心池线程设置空闲存活时间并且线程池中的线程数目大于1
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize));
    } finally {
        mainLock.unlock();
    }
    return canExit;
}

//interruptIdleWorkers()的实现
//从实现可以看出,它实际上调用的是worker的interruptIfIdle()方法
void interruptIdleWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)  //实际上调用的是worker的interruptIfIdle()方法
            w.interruptIfIdle();
    } finally {
        mainLock.unlock();
    }
}

//在worker的interruptIfIdle()方法
void interruptIfIdle() {
    final ReentrantLock runLock = this.runLock;
    if (runLock.tryLock()) {    //注意这里,是调用tryLock()来获取锁的,因为如果当前worker正在执行任务,锁已经被获取了,是无法获取到锁的
                                //如果成功获取了锁,说明当前worker处于空闲状态
        try {
    		if (thread != Thread.currentThread())  
    ·			thread.interrupt();
        } finally {
            runLock.unlock();
        }
    }
}

//addIfUnderMaximumPoolSize方法的实现
//这个方法的实现思想和addIfUnderCorePoolSize方法的实现思想非常相似,唯一的区别在于addIfUnderMaximumPoolSize方法是在线程池中的线程数达到了核心池大小并且往任务队列中添加任务失败的情况下执行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < maximumPoolSize && runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

    

Work类实现:

private final class Worker implements Runnable {
    private final ReentrantLock runLock = new ReentrantLock();
    private Runnable firstTask;
    volatile long completedTasks;
    Thread thread;
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
    }
    boolean isActive() {
        return runLock.isLocked();
    }
    void interruptIfIdle() {
        final ReentrantLock runLock = this.runLock;
        if (runLock.tryLock()) {
            try {
        if (thread != Thread.currentThread())
        thread.interrupt();
            } finally {
                runLock.unlock();
            }
        }
    }
    void interruptNow() {
        thread.interrupt();
    }
 
    private void runTask(Runnable task) {
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try {
            if (runState < STOP &&
                Thread.interrupted() &&
                runState >= STOP)
            boolean ran = false;
            beforeExecute(thread, task);   //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据
            //自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等           
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                ++completedTasks;
            } catch (RuntimeException ex) {
                if (!ran)
                    afterExecute(task, ex);
                throw ex;
            }
        } finally {
            runLock.unlock();
        }
    }
 
    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);   //当任务队列中没有任务时,进行清理工作       
        }
    }
}

来源自:http://www.cnblogs.com/dolphin0520/p/3932921.html

来源自:java并发编程的艺术


LOCK接口

锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时
访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。在Lock接
口出现之前,Java程序是靠synchronized关键字实现锁功能的,而Java SE 5之后,并发包中新增
了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功
能,只是在使用时需要显式地获取和释放锁。虽然它缺少了(通过synchronized块或者方法所提
供的)隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以
及超时获取锁等多种synchronized关键字所不具备的同步特性。

Lock接口有三个实现类,一个是ReentrantLock,另两个是ReentrantReadWriteLock类中的两个静态内部类ReadLock和WriteLock。

  • 与互斥锁定相比,读-写锁定允许对共享数据进行更高级别的并发访问。虽然一次只有一个线程(writer 线程)可以修改共享数据,但在许多情况下,任何数量的线程可以同时读取共享数据(reader 线程)。从理论上讲,与互斥锁定相比,使用读-写锁定所允许的并发性增强将带来更大的性能提高。
  • 在实践中,只有在多处理器上并且只在访问模式适用于共享数据时,才能完全实现并发性增强。——例如,某个最初用数据填充并且之后不经常对其进行
    修改的 collection,因为经常对其进行搜索(比如搜索某种目录),所以这样的 collection 是使用读-写锁定的理想候选者。

队列同步器

队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组
件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获
取线程的排队工作。

同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步
器实现锁的语义。可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交
互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;同步器面向的是锁的实现者,
它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状
态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3
个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部
类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来
供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获
取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、
ReentrantReadWriteLock和CountDownLatch等)。

实现一个lock–通过队列同步器

同步器AQS的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的
方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些
模板方法将会调用使用者重写的方法。

同步器提供的模板方法基本上分为3类:独占式获取与释放同步状态、共享式获取与释放
同步状态和查询同步队列中的等待线程情况。自定义同步组件将使用同步器提供的模板方法
来实现自己的同步语义。

下面为自定义的一个lock锁:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class Mutex implements Lock {
    // 静态内部类,自定义同步器,随后模板方法会调用这些方法
    private static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -4387327721959839431L;

        // 是否处于占用状态
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 当状态为0的时候获取锁
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // 释放锁,将状态设置为0
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0)
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // 返回一个Condition,每个condition都包含了一个condition队列
        Condition newCondition() {
            return new ConditionObject();
        }
    }

    // 仅需要将操作代理到Sync上即可
    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

重入锁(ReentrantLock)

重入锁就是该锁支持一个线程对资源的重复可利用。除此之外,该锁还支持获取锁的公平与非公平选择。

ReentrantLock中有3个内部类,分别是Sync、FairSync和NonfairSync。

Sync是一个继承AQS的抽象类,使用独占锁,复写了tryRelease方法。tryAcquire方法由它的两个FairSync(公平锁)和NonfairSync(非公平锁)实现。

ReentrantLock的lock方法使用sync的lock方法,Sync的lock方法是个抽象方法,由公平锁和非公平锁去实现。unlock方法直接使用AQS的release方法。所以说公平锁和非公平锁的释放锁过程是一样的,不一样的是获取锁过程。

先看一下公平锁nonfair的lock方法:

final void lock() {
// acquire方法内部调用tryAcquire方法
// 公平锁的获取锁方法,对于没有获取到的线程,会按照队列的方式挂起线程
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 公平锁这里多了一个!hasQueuedPredecessors()判断,表示是否有线程在队列里等待的时间比当前线程要长,如果有等待时间更长的线程,那么放弃获取锁
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

该方法与nonfairTryAcquire(int acquires)比较,唯一不同的位置为判断条件多了hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果该
方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释
放锁之后才能继续获取锁。

非公平锁lock方法:

final void lock() {
    // 非公平锁的获取锁
    // 跟公平锁的区别就在这里。直接对状态位state进行cas操作,成功就获取锁,这是一种抢占式的方式。不成功跟公平锁一样进入队列挂起线程
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
// 调用Sync的nonfairTryAcquire方法
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

该方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来
决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回
true,表示获取同步状态成功。

对于unlock方法直接使用AQS的release方法。公平锁和非公平锁的释放锁过程是一样的,不一样的是获取锁过程。
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 释放
if (Thread.currentThread() != getExclusiveOwnerThread()) // 如果当前线程不是独占线程,直接抛出异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 由于是可重入锁,需要判断是否全部释放了
free = true;
setExclusiveOwnerThread(null); // 全部释放的话直接把独占线程设置为null
}
setState(c);
return free;
}

// 恢复线程
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;  // 恢复第一个挂起的线程
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

ReentrantLock的默认构造函数使用的是NonfairSync,如果想使用FairSync,使用带有boolean参数的构造函数,传入true表示FairSync,否则是NonfairSync。

读写锁 ReentrantReadWriteLock

读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读
线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写
锁,使得并发性相比一般的排他锁有了很大提升。

  • 使用范例

      public class Cache {
          static Map<String, Object> map = new HashMap<String, Object>();
          static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
          static Lock r = rwl.readLock();
          static Lock w = rwl.writeLock();
          // 获取一个key对应的value
          public static final Object get(String key) {
              r.lock();
              try {
                  return map.get(key);
              } finally {
                  r.unlock();
              }
          }
          // 设置key对应的value,并返回旧的value
          public static final Object put(String key, Object value) {
              w.lock();
              try {
                  return map.put(key, value);
              } finally {
                  w.unlock();
              }
              }// 清空所有的内容
              public static final void clear() {
                  w.lock();
                  try {
                      map.clear();
                  } finally {
                      w.unlock();
                  }
          }
      }	
    
  • 读写锁实现分析

  • 1 - 读写状态的设计

读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。回想ReentrantLock中自定义同步器的实现,同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。
如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写。

  • 2 - 写锁的获取与释放

ReentrantReadWriteLock的tryAcquire方法

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // 存在读锁或者当前获取线程不是已经获取写锁的线程
        if (w == 0 || current != getExclusiveOwnerThread())
        return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
        throw new Error("Maximum lock count exceeded");
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
        return false;
    }
    setExclusiveOwnerThread(current);
    return true;
}

写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0
时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对
后续读写线程可见。

  • 3- 读锁的获取的释放

ReentrantReadWriteLock的tryAcquireShared方法

protected final int tryAcquireShared(int unused) {
    for (;;) {
        int c = getState();
        int nextc = c + (1 << 16);
        if (nextc < c)
            throw new Error("Maximum lock count exceeded");
        if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
            return -1;
        if (compareAndSetState(c, nextc))
            return 1;
    }
}

在tryAcquireShared(int unused)方法中,如果其他线程已经获取了写锁,则当前线程获取读
锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,
依靠CAS保证)增加读状态,成功获取读锁。
读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的
值是(1<<16)

Condition接口


简单使用:

public class BoundedQueue<T> {
    private Object[] items;
    // 添加的下标,删除的下标和数组当前
    private int addIndex, removeI
    private Lock lock = new R
    private Condition notEmpty
    private Condition notFull
    public BoundedQueue(int size)
        items = new Object[si
    }
    // 添加一个元素,如果数组满,则添加线
    public void add(T t) throws I
        lock.lock();
        try {
            while (count == items.length)
            notFull.await();
            items[addIndex] = t;
            if (++addIndex == items.length)
            addIndex = 0;
            ++count;
            notEmpty.signal();
        } finally {
            ock.unlock();
        }
    }
    // 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加
    @SuppressWarnings("unchecked")
    public T remove() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
            notEmpty.await();
            Object x = items[removeIndex];
            if (++removeIndex == items.length)
            removeIndex = 0;
            --count;
            notFull.signal();
            return (T) x;
        } finally {
            lock.unlock();
        }
    }
}

全文来自 java并发编程的艺术

队列同步器

队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组
件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获
取线程的排队工作。

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状
态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3
个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操
作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部
类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来
供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获
取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、
ReentrantReadWriteLock和CountDownLatch等)。

队列同步器的接口与示例

同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的
方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些
模板方法将会调用使用者重写的方法。

以下是重写同步器方法实现的一个独占锁。

class Mutex implements Lock {

    // 静态内部类,自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
            // 是否处于占用状态
            protected boolean isHeldExclusively() {
                return getState() == 1;
            } 

            // 当状态为0的时候获取锁
            public boolean tryAcquire(int acquires) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            } 
            // 释放锁,将状态设置为0
            protected boolean tryRelease(int releases) {
                if (getState() == 0) throw new  IllegalMonitorStateException();
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            } 

            // 返回一个Condition,每个condition都包含了一个condition队列
            Condition newCondition() { return new ConditionObject(); }
    } 

    // 仅需要将操作代理到Sync上即可
    private final Sync sync = new Sync();
    public void lock() { sync.acquire(1); }
    public boolean tryLock() { return sync.tryAcquire(1); }
    public void unlock() { sync.release(1); }
    public Condition newCondition() { return sync.newCondition(); }
    public boolean isLocked() { return sync.isHeldExclusively(); }
    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    } 
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

队列同步器的实现分析

同步队列

同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取
同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其
加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再
次尝试获取同步状态。

节点是构成同步队列的基础。

同步器拥有首节点(head)
和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部,同步队列的
基本结构如图示。

同步队列在加入尾结点需要cas来保证线程安全,而设置头结点不需要。

独占式同步状态获取与释放

主要流程为:

通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是
由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同
步队列中移出。

//同步器的acquire方法
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等
待的相关工作,其主要逻辑是:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法
保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式
Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)
方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该
节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的
唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

//同步器的addWaiter和enq方法
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 快速尝试在尾部添加
    Node pred = tail;if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
            tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
            t.next = node;
            return t;
            }
        }
    }
}

当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能
够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释
放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

总结:在获取同步状态时,同步器维
护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列
(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步
器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。

共享式同步状态获取与释放

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状
态。

通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
    }
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null;
                    if (interrupted)selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed) cancelAcquire(node);
    }
}

在acquireShared(int arg)方法中,同步器调用tryAcquireShared(int arg)方法尝试获取同步状
态,tryAcquireShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示能够获取到同
步状态。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是
tryAcquireShared(int arg)方法返回值大于等于0。可以看到,在doAcquireShared(int arg)方法的自
旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示
该次获取同步状态成功并从自旋过程中退出。

与独占式一样,共享式获取也需要释放同步状态,通过调用releaseShared(int arg)方法可以
释放同步状态。方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线
程同时访问的并发组件(比如Semaphore),它和独占式主要区别在于tryReleaseShared(int arg)
方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和CAS来保证的,因为
释放同步状态的操作会同时来自多个线程。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

独占式超时获取同步状态

通过调用同步器的doAcquireNanos(int arg,long nanosTimeout)方法可以超时获取同步状
态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则,返回false。该
方法提供了传统Java同步操作(比如synchronized关键字)所不具备的特性。

超时获取同步状态过程可以被视作响应中断获取同步状态过程的“增强版”,
doAcquireNanos(int arg,long nanosTimeout)方法在支持响应中断的基础上,增加了超时获取的
特性。针对超时获取,主要需要计算出需要睡眠的时间间隔nanosTimeout,为了防止过早通知,
nanosTimeout计算公式为:nanosTimeout-=now-lastTime,其中now为当前唤醒时间,lastTime为上
次唤醒时间,如果nanosTimeout大于0则表示超时时间未到,需要继续睡眠nanosTimeout纳秒,
反之,表示已经超时。

private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {
    long lastTime = System.nanoTime();
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            if (nanosTimeout <= 0) return false;
            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            long now = System.nanoTime();
            //计算时间,当前时间now减去睡眠之前的时间lastTime得到已经睡眠
            //的时间delta,然后被原有超时时间nanosTimeout减去,得到了
            //还应该睡眠的时间
            nanosTimeout -= now - lastTime;
            lastTime = now;
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed) cancelAcquire(node);
    }
}

该方法在自旋过程中,当节点的前驱节点为头节点时尝试获取同步状态,如果获取成功
则从该方法返回,这个过程和独占式同步获取的过程类似,但是在同步状态获取失败的处理
上有所不同。如果当前线程获取同步状态失败,则判断是否超时(nanosTimeout小于等于0表示
已经超时),如果没有超时,重新计算超时间隔nanosTimeout,然后使当前线程等待
nanosTimeout纳秒(当已到设置的超时时间,该线程会从LockSupport.parkNanos(Object
blocker,long nanos)方法返回)。

如果nanosTimeout小于等于spinForTimeoutThreshold(1000纳秒)时,将不会使该线程进行
超时等待,而是进入快速的自旋过程。原因在于,非常短的超时等待无法做到十分精确,如果
这时再进行超时等待,相反会让nanosTimeout的超时从整体上表现得反而不精确。因此,在超
时非常短的场景下,同步器会进入无条件的快速自旋。

独占式超时获取同步态的流程如图5所示。


全文来自 Java并发编程的艺术

solr的安装与配置


solr简介

  Solr(读作“solar”)是Apache Lucene项目的开源企业搜索平台。其主要功能包括全文检索、命中标示[1]、分面搜索、动态聚类、数据库集成,以及富文本(如Word、PDF)的处理。Solr是高度可扩展的,并提供了分布式搜索和索引复制。Solr是最流行的企业级搜索引擎,[2]Solr 4还增加了NoSQL支持。[3]

  Solr是用Java编写、运行在Servlet容器(如Apache Tomcat或Jetty)的一个独立的全文搜索服务器。 Solr采用了Lucene Java搜索库为核心的全文索引和搜索,并具有类似REST的HTTP/XML和JSON的API。 Solr强大的外部配置功能使得无需进行Java编码,便可对其进行调整以适应多种类型的应用程序。Solr有一个插件架构,以支持更多的高级定制。

solr的安装

  1. 下载solr 8.0版并解压 ,下载地址为: https://www-eu.apache.org/dist/lucene/solr/8.0.0/

  2. 下载并安装tomcat

  3. 将解压后solr-8.0.0\server\solr-webapp\webapp这个文件夹复制到tomcat的apache-tomcat-9.0.14\webapps文件夹下,并改名为solr(为了方便访问)

  4. 把solr下example/lib/ext 目录下的所有的 jar 包,添加到 solr 的工程中(\WEB-INF\lib目录下)。

  5. 在任意位置创建solr-home 目录如D:\solrhome,然后将solr-8.0.0\server\solr文件夹复制到该目录下

  6. 创建collection:在solr目录下创建文件夹firstcore(也就是集合),将\solr\configsets\sample_techproducts_configs下的conf文件复制到该目录下

  7. 关联 solr 及 solrhome。需要修改 solr 工程的 web.xml 文件。

     <env-entry>
        <env-entry-name>solr/home</env-entry-name>
        <env-entry-value>d:\solrhome</env-entry-value>
        <env-entry-type>java.lang.String</env-entry-type>
     </env-entry>
    
  8. 启动tomcat apache-tomcat-9.0.14\bin\startup.bat

配置中文分析器IK Analyzer

下载IK Analyzer:

   因为solr为8.0版本,网上常见的IK Analyzer版本为IKAnalyzer2012FF_u1.jar无法使用,需要下载最新版本,下载网址为:https://github.com/magese/ik-analyzer-solr7

在solr中配置IK Analyzer

  1. 将resources目录下的5个配置文件放入的solr的服务Jetty或Tomcat的webapp/solr/WEB-INF/classes/目录下;

     ①IKAnalyzer.cfg.xml 
     ②ext.dic 
     ③stopword.dic 
     ④ik.conf 
     ⑤dynamicdic.txt
    
  2. 配置的Solr的managed-schema(路径为solr\firstCore\conf),添加ik分词器,示例如下;

     <! -  ik分词器 - > 
     <fieldType name =“text_ik”class =“solr.TextField”> 
       <analyzer type =“index”> 
           <tokenizer class =“org.wltea.analyzer.lucene.IKTokenizerFactory”useSmart = “false”conf =“ik.conf”/> 
           <filter class =“solr.LowerCaseFilterFactory”/> 
       </ analyzer> 
       <analyzer type =“query”> 
           <tokenizer class =“org.wltea.analyzer.lucene.IKTokenizerFactory” useSmart =“true”conf =“ik.conf”/> 
           <filter class =“solr.LowerCaseFilterFactory”/> 
       </ analyzer> 
     </ fieldType>
    

配置域

  域相当于数据库的表字段,用户存放数据,因此用户根据业务需要去定义相关的Field(域),一般来说,每一种对应着一种数据,用户对同一种数据进行相同的操作。

  域的常用属性:

•	name:指定域的名称
•	type:指定域的类型
•	indexed:是否索引
•	stored:是否存储
•	required:是否必须
•	multiValued:是否多值

修改solrhome的schema.xml文件,添加field

<field name="item_title" type="text_ik" indexed="true" stored="true"/>

复制域

复制域的作用在于将某一个Field中的数据复制到另一个域中(可以用于多字段搜索)

<field name="item_keywords" type="text_ik" indexed="true" stored="false" multiValued="true"/>
<copyField source="item_title" dest="item_keywords"/>
<copyField source="item_name" dest="item_keywords"/>

动态域

当我们需要动态扩充字段时,我们需要使用动态域。需要实现的效果如下:

<dynamicField name="item_spec_*" type="string" indexed="true" stored="true" />	

Spring Data Solr的简单使用

入门demo

  1. 创建maven工程,pom.xml中引入依赖

     <dependencies>
         <dependency>
             <groupId>org.springframework.data</groupId>
             <artifactId>spring-data-solr</artifactId>
             <version>1.5.5.RELEASE</version>
         </dependency> 
         <dependency>
             <groupId>org.springframework</groupId>
             <artifactId>spring-test</artifactId>
             <version>4.2.4.RELEASE</version>
         </dependency>
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>4.9</version>
         </dependency>
     </dependencies>
    
  2. 在src/main/resources下创建 applicationContext-solr.xml

     <?xml version="1.0" encoding="UTF-8"?>
     <beans xmlns="http://www.springframework.org/schema/beans"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
         xmlns:context="http://www.springframework.org/schema/context"
         xmlns:solr="http://www.springframework.org/schema/data/solr"
         xsi:schemaLocation="http://www.springframework.org/schema/data/solr 
               http://www.springframework.org/schema/data/solr/spring-solr-1.0.xsd
             http://www.springframework.org/schema/beans 
             http://www.springframework.org/schema/beans/spring-beans.xsd
             http://www.springframework.org/schema/context 
             http://www.springframework.org/schema/context/spring-context.xsd">
         <!-- solr服务器地址,记得加上集合名 -->
         <solr:solr-server id="solrServer" url="http://127.0.0.1:8080/solr/firstcore" />
         <!-- solr模板,使用solr模板可对索引库进行CRUD的操作 -->
         <bean id="solrTemplate" class="org.springframework.data.solr.core.SolrTemplate">
             <constructor-arg ref="solrServer" />
         </bean>
     </beans>
    
  3. 将需要导入solr的实体类添加注解@Field

    如果属性与配置文件定义的域名称不一致,需要在注解中指定域名称。如同下面的title,在solr的配置文件中名为item_title, 所以要加上注解@Field("item_titel"),其他也一样。()

     public class TbItem implements Serializable{
    
         @Field
         private Long id;
     
         @Field("item_title")
         private String title;
             
         @Field("item_price")
         private BigDecimal price;
     
        
     }
    
  4. 创建测试类TestTemplate.java

     @RunWith(SpringJUnit4ClassRunner.class)
     @ContextConfiguration(locations="classpath:applicationContext-solr.xml")
     public class TestTemplate {
     
         @Autowired
         private SolrTemplate solrTemplate;
    
         //增加到solr索引库
         @Test
         public void testAdd(){
             TbItem item=new TbItem();
             item.setId(1L);
             item.setTitle("barktegh");
             item.setPrice(new BigDecimal(2000));		
             solrTemplate.saveBean(item);
             solrTemplate.commit();
         }
    
         //按主键查询
         @Test
         public void testFindOne(){
             TbItem item = solrTemplate.getById(1, TbItem.class);
             System.out.println(item.getTitle());
         }
    
         //循环项solr插入数据
         @Test
         public void testAddList(){
             List<TbItem> list=new ArrayList();
             
             for(int i=0;i<100;i++){
                 TbItem item=new TbItem();
                 item.setId(i+1L);
                 item.setTitle("华为Mate"+i);
                 item.setPrice(new BigDecimal(2000+i));	
                 list.add(item);
             }
             
             solrTemplate.saveBeans(list);
             solrTemplate.commit();
         }
    
         //分页查询
         @Test
         public void testPageQuery(){
             Query query=new SimpleQuery("*:*");
             query.setOffset(20);//开始索引(默认0)
             query.setRows(20);//每页记录数(默认10)
             ScoredPage<TbItem> page = solrTemplate.queryForPage(query, TbItem.class);
             System.out.println("总记录数:"+page.getTotalElements());
             List<TbItem> list = page.getContent();
             showList(list);
         }	
    
    
    
         //条件查询
         @Test
         public void testPageQueryMutil(){	
             Query query=new SimpleQuery("*:*");
             Criteria criteria=new Criteria("item_title").contains("2");
             criteria=criteria.and("item_title").contains("5");		
             query.addCriteria(criteria);
             //query.setOffset(20);//开始索引(默认0)
             //query.setRows(20);//每页记录数(默认10)
             ScoredPage<TbItem> page = solrTemplate.queryForPage(query, TbItem.class);
             System.out.println("总记录数:"+page.getTotalElements());
             List<TbItem> list = page.getContent();
             showList(list);
         }
         
         //删除全部数据
         @Test
         public void testDeleteAll(){
             Query query=new SimpleQuery("*:*");
             solrTemplate.delete(query);
             solrTemplate.commit();
         }
    
    
         //显示记录数据
         private void showList(List<TbItem> list){		
             for(TbItem item:list){
                 System.out.println(item.getTitle() +item.getPrice());
             }		
         }
    
     }
    
  5. http://127.0.0.1:8080/solr 可以查询到数据变化

HashMap介绍


前言

HashMap是一个经典的key-value结构,是线程不安全的。如果要使用线程安全的hashMap可以使用并发包里的ConcurrentHashMap。
jdk在1.7和1.8的具体实现稍有不同。

HaspMap在1.7的实现

基本数据结构和变量:

hashmap的内部数据结构是一个Entry数组transient Node<K,V>[] table(被关键citransient修饰是为了序列化时只用序列化已使用的数据)。

其中Entry是一个内部类,源码如下:

static class Entry<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    V value;
    Entry<K,V> next;

    Entry(int hash, K key, V value, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }

    public final K getKey()        { return key; }
    public final V getValue()      { return value; }
    public final String toString() { return key + "=" + value; }

    public final int hashCode() {
        return Objects.hashCode(key) ^ Objects.hashCode(value);
    }

    public final V setValue(V newValue) {
        V oldValue = value;
        value = newValue;
        return oldValue;
    }

    public final boolean equals(Object o) {
        if (o == this)
            return true;
        if (o instanceof Map.Entry) {
            Map.Entry<?,?> e = (Map.Entry<?,?>)o;
            if (Objects.equals(key, e.getKey()) &&
                Objects.equals(value, e.getValue()))
                return true;
        }
        return false;
    }
}

Entry主要有四个成员变量:

  • key就是键
  • value 是值
  • hash存放当前key的hashcode
  • next用于实现链表

HashMap还有一些核心变量如下:

  1. static final int DEFAULT_INITIAL_CAPACITY = 1 << 4;

    初始化桶的大小,默认为16。选择2的n次方是为了在扩容是计算hashcode将取模运算转为位运算。

     static final int hash(Object key) {
         int h;
         return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
     }
    
  2. static final int MAXIMUM_CAPACITY = 1 << 30;

    桶最大值

  3. static final float DEFAULT_LOAD_FACTOR = 0.75f;

    默认装载因子0.75

  4. transient int size;

    Map存放数量的大小

  5. int threshold;

    桶大小,可在初始化时显式指定

  6. final float loadFactor;

    装载因子,可在初始化时显式指定。
    其中当map的数量达到threshold*loadFactor时,就需要对map就行扩容。

put方法

HashMap存放数据1.7源码如下:

public V put(K key, V value) {
    if (table == EMPTY_TABLE) {
        inflateTable(threshold);
    }
    if (key == null)
        return putForNullKey(value);
    int hash = hash(key);
    int i = indexFor(hash, table.length);
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }
    modCount++;
    addEntry(hash, key, value, i);
    return null;
}

流程如下:

  1. 判断当前数组是否需要初始化。

  2. 如果 key 为空,则 put 一个空值进去。

  3. 根据 key 计算出 hashcode。

  4. 根据计算出的 hashcode 定位出所在桶。

  5. 如果桶是一个链表则需要遍历判断里面的 hashcode、key 是否和传入 key 相等,如果相等则进行覆盖,并返回原来的值。

  6. 如果桶是空的或者没有找到key,说明当前位置没有数据存入;新增一个 Entry 对象写入当前位置。

     void addEntry(int hash, K key, V value, int bucketIndex) {
         if ((size >= threshold) && (null != table[bucketIndex])) {
             resize(2 * table.length);
             hash = (null != key) ? hash(key) : 0;
             bucketIndex = indexFor(hash, table.length);
         }
         createEntry(hash, key, value, bucketIndex);
     }
     void createEntry(int hash, K key, V value, int bucketIndex) {
         Entry<K,V> e = table[bucketIndex];
         table[bucketIndex] = new Entry<>(hash, key, value, e);
         size++;
     }
    

get方法

HashMap获取数据1.7源码如下:

public V get(Object key) {
    if (key == null)
        return getForNullKey();
    Entry<K,V> entry = getEntry(key);
    return null == entry ? null : entry.getValue();
}
final Entry<K,V> getEntry(Object key) {
    if (size == 0) {
        return null;
    }
    int hash = (key == null) ? 0 : hash(key);
    for (Entry<K,V> e = table[indexFor(hash, table.length)];
         e != null;
         e = e.next) {
        Object k;
        if (e.hash == hash &&
            ((k = e.key) == key || (key != null && key.equals(k))))
            return e;
    }
    return null;
}

流程如下:

  1. 根据key计算出hashcode,定位到桶的位置
  2. 如果桶为空,直接返回null
  3. 不然遍历该桶,比较key,value,hash值是否相等;如果相等就返回值,不然返回null

HaspMap在1.8的实现

1.8相比较1.7的改变主要在以下几个点:

  1. 将Entry更改为Node
  2. 在链表长度超过8时就更改为红黑树,加快查询。
  3. 添加static final int TREEIFY_THRESHOLD = 8变量。
  4. 链表头插法改为尾插法(保持原来的顺序),就要为了解决并发put导致resize出现死循环。

以上这些改变都是为了解决hash冲突时链表长度过长,导致查询效率下降的问题,通过将长度超过8的链表转为红黑树来加快查询。此时的数据结构如下:

put方法

HashMap存放数据1.8源码:

public V put(K key, V value) {
    return putVal(hash(key), key, value, false, true);
}

/**
 * Implements Map.put and related methods.
 *
 * @param hash hash for key
 * @param key the key
 * @param value the value to put
 * @param onlyIfAbsent if true, don't change existing value
 * @param evict if false, the table is in creation mode.
 * @return previous value, or null if none
 */
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
               boolean evict) {
    Node<K,V>[] tab; Node<K,V> p; int n, i;
    if ((tab = table) == null || (n = tab.length) == 0)
        n = (tab = resize()).length;
    if ((p = tab[i = (n - 1) & hash]) == null)
        tab[i] = newNode(hash, key, value, null);
    else {
        Node<K,V> e; K k;
        if (p.hash == hash &&
            ((k = p.key) == key || (key != null && key.equals(k))))
            e = p;
        else if (p instanceof TreeNode)
            e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
        else {
            for (int binCount = 0; ; ++binCount) {
                if ((e = p.next) == null) {
                    p.next = newNode(hash, key, value, null);
                    if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                        treeifyBin(tab, hash);
                    break;
                }
                if (e.hash == hash &&
                    ((k = e.key) == key || (key != null && key.equals(k))))
                    break;
                p = e;
            }
        }
        if (e != null) { // existing mapping for key
            V oldValue = e.value;
            if (!onlyIfAbsent || oldValue == null)
                e.value = value;
            afterNodeAccess(e);
            return oldValue;
        }
    }
    ++modCount;
    if (++size > threshold)
        resize();
    afterNodeInsertion(evict);
    return null;
}

流程如下:

  1. 判断桶数组是否为空;如果为空通过resize()来初始化桶数组。
  2. 否则通过hash定位到桶位置;若果桶为空,新建一个Node节点(或者说新桶)。
  3. 否则先判断当前桶的hash,key与写入的hash,key是否相等;相等的话将值赋予给e;
  4. 如果当前桶是红黑树,就以红黑树的方式写入;
  5. 如果是个链表,如果在遍历过程中找到 key 相同时直接退出遍历。
  6. 如果没有找到相同的节点,就需要将当前的 key、value 封装成一个新节点写入到当前桶的后面(形成链表)赋予给e,接着判断当前链表的大小是否大于预设的阈值,大于时就要转换为红黑树。
  7. 判断,如果 e != null 就相当于存在相同的 key,那就需要将值覆盖。
  8. 最后判断是否需要进行扩容。

get方法

HashMap获取数据1.8源码如下:

public V get(Object key) {
    Node<K,V> e;
    return (e = getNode(hash(key), key)) == null ? null : e.value;
}

/**
 * Implements Map.get and related methods.
 *
 * @param hash hash for key
 * @param key the key
 * @return the node, or null if none
 */
final Node<K,V> getNode(int hash, Object key) {
    Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (first = tab[(n - 1) & hash]) != null) {
        if (first.hash == hash && // always check first node
            ((k = first.key) == key || (key != null && key.equals(k))))
            return first;
        if ((e = first.next) != null) {
            if (first instanceof TreeNode)
                return ((TreeNode<K,V>)first).getTreeNode(hash, key);
            do {
                if (e.hash == hash &&
                    ((k = e.key) == key || (key != null && key.equals(k))))
                    return e;
            } while ((e = e.next) != null);
        }
    }
    return null;
}

流程如下:

  1. 首先将 key hash 之后取得所定位的桶。
  2. 如果桶为空则直接返回 null 。
  3. 否则判断桶的第一个位置(有可能是链表、红黑树)的 key 是否为查询的 key,是就直接返回 value。
  4. 如果第一个不匹配,则判断它的下一个是红黑树还是链表。
  5. 红黑树就按照树的查找方式返回值。
  6. 不然就按照链表的方式遍历匹配返回值。

ConcurrentHashMap


ConcurrentHashMap在1.7中的实现

基本数据结构

ConcurrentHashMap 采用了分段锁技术,其中 Segment 继承于 ReentrantLock,不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理。无论是读操作还是写操作都能保证很高的性能:在进行读操作时(几乎)不需要加锁,而在写操作时通过锁分段技术只对所操作的段加锁而不影响客户端对其它段的访问。特别地,在理想状态下,ConcurrentHashMap 可以支持 16 个线程执行并发写操作(如果并发级别设为16),及任意数量线程的读操作。

ConcurrentHashMap的高效并发机制是通过以下三方面来保证的:

  1. 通过锁分段技术保证并发环境下的写操作;

  2. 通过 HashEntry的不变性、Volatile变量的内存可见性和加锁重读机制保证高效、安全的读操作;

  3. 通过不加锁和加锁两种方案控制跨段操作的的安全性。

主要变量如下:

/**
 * Mask value for indexing into segments. The upper bits of a
 * key's hash code are used to choose the segment.
 */
final int segmentMask;  // 用于定位段,大小等于segments数组的大小减 1,是不可变的

/**
 * Shift value for indexing within segments.
 */
final int segmentShift;    // 用于定位段,大小等于32(hash值的位数)减去对segments的大小取以2为底的对数值,是不可变的

/**
 * The segments, each of which is a specialized hash table
 */
final Segment<K,V>[] segments;   // ConcurrentHashMap的底层结构是一个Segment数组

Segment类主要组成:

// 
static final class Segment<K,V> extends ReentrantLock implements Serializable {

    /**
     * The number of elements in this segment's region.
     */
    transient volatile int count;    // Segment中元素的数量,可见的

    /**
     * Number of updates that alter the size of the table. This is
     * used during bulk-read methods to make sure they see a
     * consistent snapshot: If modCounts change during a traversal
     * of segments computing size or checking containsValue, then
     * we might have an inconsistent view of state so (usually)
     * must retry.
     */
    transient int modCount;  //对count的大小造成影响的操作的次数(比如put或者remove操作)

    /**
     * The table is rehashed when its size exceeds this threshold.
     * (The value of this field is always <tt>(int)(capacity *
     * loadFactor)</tt>.)
     */
    transient int threshold;      // 阈值,段中元素的数量超过这个值就会对Segment进行扩容

    /**
     * The per-segment table.
     */
    transient volatile HashEntry<K,V>[] table;  // 链表数组

    /**
     * The load factor for the hash table.  Even though this value
     * is same for all segments, it is replicated to avoid needing
     * links to outer object.
     * @serial
     */
    final float loadFactor;  // 段的负载因子,其值等同于ConcurrentHashMap的负载因子

    ...
}

HashEntry类

 /**
 * HashMap 中的 Entry 类
 */
static class Entry<K,V> implements Map.Entry<K,V> {
    final K key;
    V value;
    Entry<K,V> next;
    final int hash;

    /**
     * Creates new entry.
     */
    Entry(int h, K k, V v, Entry<K,V> n) {
        value = v;
        next = n;
        key = k;
        hash = h;
    }
    ...
}

put方法

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}

流程如下:

  1. 根据key的hash值定位到Segment

  2. 利用 scanAndLockForPut() 获得Segment的锁

    • 尝试自旋获取锁。
    • 如果重试的次数达到了 MAX_SCAN_RETRIES 则改为阻塞锁获取,保证能获取成功。
  3. 将当前 Segment 中的 table 通过 key 的 hashcode 定位到 HashEntry。

  4. 遍历该 HashEntry,如果不为空则判断传入的 key 和当前遍历的 key 是否相等,相等则覆盖旧的 value。不为空则需要新建一个 HashEntry 并加入到 Segment 中,同时会先判断是否需要扩容。

  5. 最后会解除所获取当前 Segment 的锁。

get方法

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

ConcurrentHashMap在1.8中的实现

1.8相较于1.7更改主要在一下几个点:

  • 抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性
  • 存放数据的 HashEntry 改为 Node,但作用都是相同的
  • val next 都用了 volatile 修饰,保证了可见性
  • 链表长度超过8就采用红黑树来实现

主要引用:

HashMap? ConcurrentHashMap? 相信看完这篇没人能难住你

Map 综述(三):彻头彻尾理解 ConcurrentHashMap

#使用Standford Core NLP的几种方法

stanford corenlp 2018-02-27
java环境: 1.8 64位

因为斯坦福corenlp最新版都推荐使用服务器来运行,且nltk包不在支持,直接调用会出现一些问题,建议使用服务器来使用


##1 使用Core nlp服务器,在来连接服务器

  • 创建Core nlp服务器

    在命令行中进入corenlp文件所在位置,输入开启服务器命令

      e:
      cd E:\corenlp\stanford-corenlp-full-2018-02-27 
      java -mx4g -cp "*" edu.stanford.nlp.pipeline.StanfordCoreNLPServer -port 9000 -timeout 15000
    
  • 使用nltk(3.3v以上)包连接服务器
    API网址:https://www.nltk.org/_modules/nltk/parse/corenlp.html
    >>> from nltk.parse import CoreNLPParser

    # Lexical Parser
    >>> parser = CoreNLPParser(url=’http://localhost:9000‘)

    # Parse tokenized text.
    >>> list(parser.parse(‘What is the airspeed of an unladen swallow ?’.split()))
    [Tree(‘ROOT’, [Tree(‘SBARQ’, [Tree(‘WHNP’, [Tree(‘WP’, [‘What’])]), Tree(‘SQ’, [Tree(‘VBZ’, [‘is’]), Tree(‘NP’, [Tree(‘NP’, [Tree(‘DT’, [‘the’]), Tree(‘NN’, [‘airspeed’])]), Tree(‘PP’, [Tree(‘IN’, [‘of’]), Tree(‘NP’, [Tree(‘DT’, [‘an’]), Tree(‘JJ’, [‘unladen’])])]), Tree(‘S’, [Tree(‘VP’, [Tree(‘VB’, [‘swallow’])])])])]), Tree(‘.’, [‘?’])])])]

    # Parse raw string.
    >>> list(parser.raw_parse(‘What is the airspeed of an unladen swallow ?’))
    [Tree(‘ROOT’, [Tree(‘SBARQ’, [Tree(‘WHNP’, [Tree(‘WP’, [‘What’])]), Tree(‘SQ’, [Tree(‘VBZ’, [‘is’]), Tree(‘NP’, [Tree(‘NP’, [Tree(‘DT’, [‘the’]), Tree(‘NN’, [‘airspeed’])]), Tree(‘PP’, [Tree(‘IN’, [‘of’]), Tree(‘NP’, [Tree(‘DT’, [‘an’]), Tree(‘JJ’, [‘unladen’])])]), Tree(‘S’, [Tree(‘VP’, [Tree(‘VB’, [‘swallow’])])])])]), Tree(‘.’, [‘?’])])])]

    # Neural Dependency Parser
    >>> from nltk.parse.corenlp import CoreNLPDependencyParser
    >>> dep_parser = CoreNLPDependencyParser(url=’http://localhost:9000‘)
    >>> parses = dep_parser.parse(‘What is the airspeed of an unladen swallow ?’.split())
    >>> [[(governor, dep, dependent) for governor, dep, dependent in parse.triples()] for parse in parses]
    [[((‘What’, ‘WP’), ‘cop’, (‘is’, ‘VBZ’)), ((‘What’, ‘WP’), ‘nsubj’, (‘airspeed’, ‘NN’)), ((‘airspeed’, ‘NN’), ‘det’, (‘the’, ‘DT’)), ((‘airspeed’, ‘NN’), ‘nmod’, (‘swallow’, ‘VB’)), ((‘swallow’, ‘VB’), ‘case’, (‘of’, ‘IN’)), ((‘swallow’, ‘VB’), ‘det’, (‘an’, ‘DT’)), ((‘swallow’, ‘VB’), ‘amod’, (‘unladen’, ‘JJ’)), ((‘What’, ‘WP’), ‘punct’, (‘?’, ‘.’))]]


    # Tokenizer 分词
    >>> parser = CoreNLPParser(url=’http://localhost:9000‘)
    >>> list(parser.tokenize(‘What is the airspeed of an unladen swallow?’))
    [‘What’, ‘is’, ‘the’, ‘airspeed’, ‘of’, ‘an’, ‘unladen’, ‘swallow’, ‘?’]

    # POS Tagger 词性标注
    >>> pos_tagger = CoreNLPParser(url=’http://localhost:9000‘, tagtype=’pos’)
    >>> list(pos_tagger.tag(‘What is the airspeed of an unladen swallow ?’.split()))
    [(‘What’, ‘WP’), (‘is’, ‘VBZ’), (‘the’, ‘DT’), (‘airspeed’, ‘NN’), (‘of’, ‘IN’), (‘an’, ‘DT’), (‘unladen’, ‘JJ’), (‘swallow’, ‘VB’), (‘?’, ‘.’)]

    # NER Tagger 命名实体识别
    >>> ner_tagger = CoreNLPParser(url=’http://localhost:9000‘, tagtype=’ner’)
    >>> list(ner_tagger.tag((‘Rami Eid is studying at Stony Brook University in NY’.split())))
    [(‘Rami’, ‘PERSON’), (‘Eid’, ‘PERSON’), (‘is’, ‘O’), (‘studying’, ‘O’), (‘at’, ‘O’), (‘Stony’, ‘ORGANIZATION’), (‘Brook’, ‘ORGANIZATION’), (‘University’, ‘ORGANIZATION’), (‘in’, ‘O’), (‘NY’, ‘STATE_OR_PROVINCE’)]

  • 或者使用from stanfordcorenlp import StanfordCoreNLP,网址为https://github.com/Lynten/stanford-corenlp
    nlp = StanfordCoreNLP(‘http://localhost‘, port=9000)
    # 常用api
    print ‘Tokenize:’, nlp.word_tokenize(sentence)
    print ‘Part of Speech:’, nlp.pos_tag(sentence)
    print ‘Named Entities:’, nlp.ner(sentence)
    print ‘Constituency Parsing:’, nlp.parse(sentence)
    print ‘Dependency Parsing:’, nlp.dependency_parse(sentence)

    # 一般的
    # annotators: tokenize, ssplit, pos, lemma, ner, parse, depparse, dcoref (See Detail)

    # pipelineLanguage: en, zh, ar, fr, de, es (English, Chinese, Arabic, French, German, Spanish) (See Annotator Support Detail)

    # outputFormat: json, xml, text
    props = {‘annotators’: ‘tokenize, ssplit, ner, depparse’, ‘pipelineLanguage’: ‘en’, ‘outputFormat’: ‘json’}
    ParseResult = nlp.annotate(raw_sent, properties=props)


    nlp.close()#别忘了关闭!后端服务器将消耗大量的memery。

2 不使用服务器(不推荐,以不更新)

使用nltk直接调用包
2018-12-28测试发现侧方法会出现utf8解析问题,暂时无法解决,推荐使用上面的方法。
# java路径,需要修改
java_path = “C:\Program Files\Java\jre1.8.0_191\bin\java.exe”
os.environ[‘JAVA_HOME’] = java_path
os.environ[“STANFORD_PARSER”] = Utils.get_project_path()+”\lib_data\stanford-parser-full-2018-02-27\stanford-parser.jar”
os.environ[“STANFORD_MODELS”] = Utils.get_project_path()+”\lib_data\stanford-parser-full-2018-02-27\stanford-parser-3.9.1-models.jar”

self.sent_parser = stanford.StanfordParser(
                             model_path="edu/stanford/nlp/models/lexparser/englishPCFG.ser.gz",
                           java_options='-mx8g',
                            encoding='utf8')  #'-mlength100'
t1 = list(self.sent_parser.parse((senten,))

Spring装配Bean


Spring容器配置的三种主要的装配机制:

  • 在XML中进行显示配置
  • 在Java中进行显式配置
  • 隐式的bean发现机制和自动装配

1-自动装配

Spring从两个角度实现自动化装配:

  • 组件扫描(component scanning) 自动发现应用上下文中所创建的bean
  • 自动装配(autowiring) Spring自动满足bean之间的依赖

使用@Component注解来告诉Spring为这个类创建bean

@Component
public class CDPlayer implements MediaPlayer {
  private CompactDisc cd;

  @Autowired
  public CDPlayer(CompactDisc cd) {
    this.cd = cd;
  }

  public void play() {
    cd.play();
  }

}

通过设置组件扫描可以不用显示配置bean。但是组件扫描默认是不启用的。所以要通过配置来启用。

  • 在java配置类中开启组件扫描@ComponentScan

      @Configuration
      @ComponentScan
      public class CDPlayerConfig { 
      }
    

因为@ComponentScan会扫描与配置类相同的包。可以通过下面两个配置来设置扫描的包:

  • @ComponentScan(basePackages={“soundsystem,”sdd”})

    扫描包的名字

  • @ComponentScan(basePackageClasses={CDplayer.class,DVDPlayer.class})

    扫描类所在的包

  • 在XML中开启组件扫描

      <context:component-scan base-package="soundsystem" />
    
  • 为组件扫描的bean命名

    • 通过@Component注解

    @Component(“sgtPerr”)
    public class SgtPeppers implements CompactDisc {

    private String title = "Sgt. Pepper's Lonely Hearts Club Band";  
    private String artist = "The Beatles";
    
    public void play() {
      System.out.println("Playing " + title + " by " + artist);
    }
    

    }

  • 通过@Named注解
@Named("sgtPerr")
public class SgtPeppers implements CompactDisc {

}

2- JAVA代码装配Bean

通过配置类来实现装配Bean

@Configuration //表明这是一个配置类
public class CDPlayerConfig {
  
  @Bean //声明这是一个bean
  public CompactDisc compactDisc() {
    return new SgtPeppers();
  }

  @Bean
    //通过构造器注入
  public CDPlayer cdPlayer(CompactDisc compactDisc) {
    return new CDPlayer(compactDisc);
  }

  @Bean 
    //通过setter方法注入
  public CDPlayer cdPlayer(CompactDisc compactDisc) {
    CDPlayer cdPlayer = new CDPlayer(compactDisc);
    cdPlayer.setCompactDisc(compactDisc);
    return cdPlayer
  }

}

3- 通过XML装配bean

  • 通过构造器注入
<bean id="compactDisc" class="soundsystem.collections.BlankDisc">
    <constructor-arg value="Sgt. Pepper's Lonely Hearts Club Band" />
    <constructor-arg value="The Beatles" />
    <constructor-arg>
      <list>
        <value>Sgt. Pepper's Lonely Hearts Club Band</value>
        <value>With a Little Help from My Friends</value>
        <value>Lucy in the Sky with Diamonds</value>
        <value>Getting Better</value>
        <value>Fixing a Hole</value>
        <value>She's Leaving Home</value>
        <value>Being for the Benefit of Mr. Kite!</value>
        <value>Within You Without You</value>
        <value>When I'm Sixty-Four</value>
        <value>Lovely Rita</value>
        <value>Good Morning Good Morning</value>
        <value>Sgt. Pepper's Lonely Hearts Club Band (Reprise)</value>
        <value>A Day in the Life</value>
      </list>
    </constructor-arg>
  </bean>
  • 通过setter注入
 <bean id="compactDisc"
    class="soundsystem.properties.BlankDisc">
    <property name="title" value="Sgt. Pepper's Lonely Hearts Club Band" />
    <property name="artist" value="The Beatles" />
    <property name="tracks">
      <list>
        <value>Sgt. Pepper's Lonely Hearts Club Band</value>
        <value>With a Little Help from My Friends</value>
        <value>Lucy in the Sky with Diamonds</value>
        <value>Getting Better</value>
        <value>Fixing a Hole</value>
        <value>She's Leaving Home</value>
        <value>Being for the Benefit of Mr. Kite!</value>
        <value>Within You Without You</value>
        <value>When I'm Sixty-Four</value>
        <value>Lovely Rita</value>
        <value>Good Morning Good Morning</value>
        <value>Sgt. Pepper's Lonely Hearts Club Band (Reprise)</value>
        <value>A Day in the Life</value>
      </list>
    </property>
  </bean>

4-混合配置

  • 在javaConfig引用xml配置

      @Configuration
      @Import(CDPlayerConfig.class) //导入其他的java配置
      @ImportResource("classpath:cd-config.xml") // 导入xml配置
      public class SoundSystemConfig {
      
      }
    
  • 在xml配置中引用javaconfig

      <bean class="soundsystem.CDConfig" /> 导入config配置类
      <import resource="cd-player-config.xml"/> 导入xml配置
    

##高级装配

1- 环境与profile

Java多线程简单使用

继承Thread类

通过继承Thread,重写run方法

public class MyThread extends Thread{

    @Override
    public void run() {
        super.run();
        System.out.println("MyThread");
    }
}

测试代码:

@Test
public void test1(){
    MyThread myThread = new MyThread();
    myThread.start();
    System.out.println("运行结束");

}

因为java是单根继承,不支持多继承,在程序设计上是有局限的。

实现Runnable接口(推荐)

实现Runnale接口,重写run方法

public class MyRunnable implements Runnable {

    @Override
    public void run() {
        System.out.println("running");
    }
}

测试代码:

@Test
public void test2(){
    MyRunnable myRunnable = new MyRunnable();
    Thread thread = new Thread(myRunnable);
    thread.start();
    System.out.println("ending");

}
  • 注意,这里测试代码是实现Runnable接口后,通过Thread来接受Runnable接口对象,这样可以让Thread的start方法来调用Run方法,起到多线程的作用。
  • 如果直接在主程序中调用run方法,就仅仅是程序的调用,无法起到多线程的作用。

java多线程的同步

synchronize实现同步

java中的每一个对象都可以作为锁,具体表现形式为下面三种形式:

  • 对于普通同步方法,锁是当前实例对象
  • 对于静态同步方法,锁是当前类的class对象
  • 对于同步方法块,锁是Synchronize括号里配置的对象

上面三种形式默认synchronized相当于synchronized(this)锁this对象。

对于synchronized(非this对象x)这种情况,有下属三种结论:

  • 当多个线程同时执行synchronized(x){}同步代码块呈现同步效果。
  • 当其它线程执行x对象中synchronized同步方法呈现同步效果
  • 当其它线程执行x对象方法里面的synchronized(this)代码块也呈现同步效果。

volatile关键字

关键字volatile主要作用是使变量在多个线程间可见


Java线程间通信

等待/通知机制

等待/通知机制,是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B
调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而
执行后续操作。上述两个线程通过对象O来完成交互,而对象上的wait()和notify/notifyAll()的
关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。

public class WaitNotify {
    static boolean flag = true;
    static Object  lock = new Object();

    public static void main(String[] args) throws Exception {
        Thread waitThread = new Thread(new Wait(), "WaitThread");
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);

        Thread notifyThread = new Thread(new Notify(), "NotifyThread");
        notifyThread.start();
    }

    static class Wait implements Runnable {
        public void run() {
            // 加锁,拥有lock的Monitor
            synchronized (lock) {
                // 当条件不满足时,继续wait,同时释放了lock的锁
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread() + " flag is true. wait @ "
                                           + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                        lock.wait();
                    } catch (InterruptedException e) {
                    }
                }
                // 条件满足时,完成工作
                System.out.println(Thread.currentThread() + " flag is false. running @ "
                                   + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            }
        }
    }

    static class Notify implements Runnable {
        public void run() {
            // 加锁,拥有lock的Monitor
            synchronized (lock) {
                // 获取lock的锁,然后进行通知,通知时不会释放lock的锁,
                // 直到当前线程释放了lock后,WaitThread才能从wait方法中返回
                System.out.println(Thread.currentThread() + " hold lock. notify @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                lock.notifyAll();
                flag = false;
                SleepUtils.second(5);
            }
            // 再次加锁
            synchronized (lock) {
                System.out.println(Thread.currentThread() + " hold lock again. sleep @ "
                                   + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                SleepUtils.second(5);
            }
        }
    }
}

我们在使用这些方法需要注意下面几点:

  • 1)使用wait()、notify()和notifyAll()时需要先对调用对象加锁。

      即这些方法必须位于synchronize同步块内。
    
  • 2)调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的
    等待队列。

  • 3)notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或
    notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。

      即会将notify()或者notifAll()所在的同步块执行完才会调转到wait()所在的同步块。
    
  • 4)notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()
    方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为
    BLOCKED。

  • 5)从wait()方法返回的前提是获得了调用对象的锁。

管道输入/输出流

管道流主要用于线程之间的数据传输,传输的媒介是内存。java中主要包括了4种具体的实现:PipedOutputStream、PipedInputStream、PipedReader和PipedWriter,前两种面向字节,而后两种面向字符。

public class Piped {

    public static void main(String[] args) throws Exception {
        PipedWriter out = new PipedWriter();
        PipedReader in = new PipedReader();
        // 将输出流和输入流进行连接,否则在使用时会抛出IOException
        out.connect(in);

        Thread printThread = new Thread(new Print(in), "PrintThread");
        printThread.start();
        int receive = 0;
        try {
            while ((receive = System.in.read()) != -1) {
                out.write(receive);
            }
        } finally {
            out.close();
        }
    }

    static class Print implements Runnable {
        private PipedReader in;

        public Print(PipedReader in) {
            this.in = in;
        }

        public void run() {
            int receive = 0;
            try {
                while ((receive = in.read()) != -1) {
                    System.out.print((char) receive);
                }
            } catch (IOException ex) {
            }
        }
    }
}

join方法

  • thread.join()当前线程等待thread线程终止后才继续执行。

  • thread.join(long millis)具备超时特性,在上者情况下,如果在指定时间内进程没终止,将从超时方法返回。

  • thread.join(long millis,int nanos)

      public class Join {
          public static void main(String[] args) throws Exception {
              Thread previous = Thread.currentThread();
              for (int i = 0; i < 10; i++) {
                  // 每个线程拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回
                  Thread thread = new Thread(new Domino(previous), String.valueOf(i));
                  thread.start();
                  previous = thread;
              }
      
              TimeUnit.SECONDS.sleep(5);
              System.out.println(Thread.currentThread().getName() + " terminate.");
          }
      
          static class Domino implements Runnable {
              private Thread thread;
      
              public Domino(Thread thread) {
                  this.thread = thread;
              }
      
              public void run() {
                  try {
                      thread.join();
                  } catch (InterruptedException e) {
                  }
                  System.out.println(Thread.currentThread().getName() + " terminate.");
              }
          }
      }
    

结果如下:

main terminate.
0 terminate.
1 terminate.
2 terminate.
3 terminate.
4 terminate.
5 terminate.
6 terminate.
7 terminate.
8 terminate.
9 terminate.

ThreadLocal的使用

ThreadLocal,即线程变量,是一个以ThreadLocal对象为键、任意对象为值的存储结构。这
个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个
线程上的一个值。

可以通过set(T)方法来设置一个值,在当前线程下再通过get()方法获取到原先设置的值。

public class Profiler {
    // 第一次get()方法调用时会进行初始化(如果set方法没有调用),每个线程会调用一次
    private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>() {
         protected Long initialValue() {
            return System.currentTimeMillis();}
    };

    public static final void begin() {
        TIME_THREADLOCAL.set(System.currentTimeMillis());
    }

    public static final long end() {
        return System.currentTimeMillis() - TIME_THREADLOCAL.get();
    }

    public static void main(String[] args) throws Exception {
        Profiler.begin();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Cost: " + Profiler.end() + " mills");
    }
}

java多线程基础

线程优先级

在Java线程中,通过一个整型成员变量priority来控制优先级,优先级的范围从1~10,在线
程构建的时候可以通过setPriority(int)方法来修改优先级,默认优先级是5,优先级高的线程分
配时间片的数量要多于优先级低的线程。设置线程优先级时,针对频繁阻塞(休眠或者I/O操
作)的线程需要设置较高优先级,而偏重计算(需要较多CPU时间或者偏运算)的线程则设置较
低的优先级,确保处理器不会被独占。在不同的JVM以及操作系统上,线程规划会存在差异,
有些操作系统甚至会忽略对线程优先级的设定。

setPriority(int)

线程的状态

java线程在运行的生命周期可能处于下表的6中不同的状态,在给定的一个时刻,
线程只能处于其中的一个状态。

守护线程Daemon

Daemon线程是一种支持型线程,因为它主要被用作程序中后台调度以及支持性工作。这
意味着,当一个Java虚拟机中不存在非Daemon线程的时候,Java虚拟机将会退出。可以通过调
用Thread.setDaemon(true)将线程设置为Daemon线程。
Daemon线程被用作完成支持性工作,但是在Java虚拟机退出时Daemon线程中的finally块
并不一定会执行。

  • Daemon属性需要在启动线程之前设置,不能在启动线程之后设置
  • 在构建Daemon线程时,不能依靠finally块中的内容来确保执行关闭或清理资源的逻辑。

线程中断

使用interrupt()方法来停止线程,但interrupt()方法仅仅是在当前线程中刚打了一个断点。

java提供两种方法判断线程是否中断:

  • this.interrupted()测试当前进程是否已经中断,执行后将具有状态标志清除为flase功能
  • this.isInterrupted()测试线程是否已经中断,不清除状态标志

推荐使用异常法来停止线程:

public class MyThreadInterrupt extends Thread {

    @Override
    public void run() {
        super.run();
        try{
            for(int i=0;i<500000;i++){
                if(this.isInterrupted()){
                    System.out.println("线程终止,即将退出");
                    throw new InterruptedException();
                }
                System.out.println("i="+(i+1));
            }
            System.out.println("for循环完了,继续执行程序。。。");
        }catch (InterruptedException e){
            System.out.println("进入catch捕捉异常");
            e.printStackTrace();
        }
    }
}

测试代码:

@Test
public void test3(){
    try{
        MyThreadInterrupt myThreadInterrupt = new MyThreadInterrupt();
        myThreadInterrupt.start();
        Thread.sleep(2000);
        myThreadInterrupt.interrupt();
    } catch (InterruptedException e){
        System.out.println("mian catch");
        e.printStackTrace();
    }

}

如果不使用抛异常来终止线程,那么线程即使中断还是会for循环继续运行

或者使用标志位来控制是否需要停止任务并终止线程

public class Shutdown {
    public static void main(String[] args) throws Exception {
        Runner one = new Runner();
        Thread countThread = new Thread(one, "CountThread");
        countThread.start();
        // 睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知中断而结束
        TimeUnit.SECONDS.sleep(1);
        countThread.interrupt();
        Runner two = new Runner();
        countThread = new Thread(two, "CountThread");
        countThread.start();
        // 睡眠1秒,main线程对Runner two进行取消,使CountThread能够感知on为false而结束
        TimeUnit.SECONDS.sleep(1);
        two.cancel();
    }

    private static class Runner implements Runnable {
        private long             i;

        private volatile boolean on = true;

        @Override
        public void run() {
            while (on && !Thread.currentThread().isInterrupted()) {
                i++;
            }
            System.out.println("Count i = " + i);
        }

        public void cancel() {
            on = false;
        }
    }
}

注意: suspend(),resume(),stop()已过期

全文来自 java并发编程的艺术 和 java多线程编程核心技术

0%