選單

分散式鎖:Redisson原始碼解析-ReadWriteLock

說到讀寫鎖,大家都會很迅速的反應過來,讀寫鎖的存在就是為了提升實際的應用的併發能力,可以保證讀讀不互斥,讀寫互斥,寫寫互斥

一、概念及實現

1. 概念

核心介面ReadWriteLock是基於Java裡的ReadWriteLock構建的,讀鎖和寫鎖都實現了 RLock 介面

允許多個 ReadLock 所有者和僅一個 WriteLock 所有者

就是讀讀不互斥

寫寫互斥

讀寫互斥

如果獲取鎖的 Redisson 例項崩潰,那麼這種鎖可能會永遠掛在獲取狀態

為了避免這種Redisson維護鎖看門狗,它會在鎖持有者Redisson例項存活時延長鎖到期時間

也可以設定鎖的持有時間leaseTime

2. 實現

RReadWriteLock lock = redissonClient。getReadWriteLock(“lockName”);lock。readLock()。lock();lock。readLock()。unlock();lock。writeLock()。lock();lock。writeLock()。unlock();

使用起來還是很簡單的

初始化

實際初始化的物件是一個RedissonReadWriteLock,這個物件會持有兩個小的物件RedissonReadLock RedissonWriteLock

這樣在使用的時候,就可以根據需要來控制需要的是writeLock還是readLock

二、原始碼解析

話不多說,直接看原始碼說明

1. 加讀鎖

實際走一遍

lock。readLock()。lock()

流程的時候,會發現,基本整個程式碼的邏輯都是走的RLock的,而重構了實際加鎖的指令碼

tryLockInnerAsync

加讀鎖lua邏輯

// KEY[1] = lockName// KEY[2] = {lockName}:uuid:threadId:rwlock_timeout// ARVG[1] = leaseTime// ARVG[2] = uuid:threadId// ARVG[3] = uuid:threadId:write// hget lockName modelocal mode = redis。call(‘hget’, KEYS[1], ‘mode’); // mode 屬性不存在if (mode == false) then // 設定lockName mode屬性值為read,直接加讀鎖 // hset lockName mode read redis。call(‘hset’, KEYS[1], ‘mode’, ‘read’); // 加鎖數量增加1 // hset lockName uuid:threadId 1 redis。call(‘hset’, KEYS[1], ARGV[2], 1); // set {lockName}:uuid:threadId:rwlock_timeout:1 1 redis。call(‘set’, KEYS[2] 。。 ‘:1’, 1); // pexpire {lockName}:uuid:threadId:rwlock_timeout:1 leaseTime redis。call(‘pexpire’, KEYS[2] 。。 ‘:1’, ARGV[1]); // 設定過期時間 // pexpire lockName leaseTime redis。call(‘pexpire’, KEYS[1], ARGV[1]); return nil;end;

1。從redis中獲取hash結構資料lockName的mode屬性值

第一次來加鎖,那麼這個key都不存在,那麼mode屬性肯定也是不存在了

2。設定lockName hash結構key,且mode屬性值為read

{ “lockName”: { “mode”: “read” }}

3。設定lockName hash結構key的uuid:threadId的屬性值為1

{ “lockName”: { “mode”: “read”, “uuid:threadId”: 1 }}

4。設定{lockName}:uuid:threadId:rwlock_timeout:1的一個key值為1

{ “lockName”: { “mode”: “read”, “uuid:threadId”: 1 }, “{lockName}:uuid:threadId:rwlock_timeout:1”: 1}

5。設定{lockName}:uuid:threadId:rwlock_timeout:1 key的失效時間為leaseTime

6。設定lockName key的失效時間為leaseTime

7。返回null

所以,如果加了一個讀鎖,那麼就會生成兩個key,

lockName {lockName}:uuid:threadId:rwlock_timeout:1

2. 讀鎖watchdog

加鎖成功後,如果沒有設定了leaseTime,就會執行排程續約時間修正,

發現加讀鎖後的watchdog被重寫了,

org。redisson。RedissonReadLock#renewExpirationAsync

// KEY[1] = lockName// KEY[2] = {lockName}// ARVG[1] = leaseTime// ARVG[2] = uuid:threadId// hget lockName uuid:threadIdlocal counter = redis。call(‘hget’, KEYS[1], ARGV[2]); if (counter ~= false) then // pexpire lockName leaseTime redis。call(‘pexpire’, KEYS[1], ARGV[1]); // hlen lockName > 1 if (redis。call(‘hlen’, KEYS[1]) > 1) then // hkeys lockName local keys = redis。call(‘hkeys’, KEYS[1]); for n, key in ipairs(keys) do // hget lockName key counter = tonumber(redis。call(‘hget’, KEYS[1], key)); // 如果獲取到了,就批次給續個時間 if type(counter) == ‘number’ then for i=counter, 1, -1 do redis。call(‘pexpire’, KEYS[2] 。。 ‘:’ 。。 key 。。 ‘:rwlock_timeout:’ 。。 i, ARGV[1]); end; end; end; return 1; end;end; return 0;

對redis的lock進行續約

獲取lockNamekey中uuid:threadId的值,這個值表示的是這個執行緒獲取的讀鎖數量,在第一次加讀鎖後,這個地方的值應該是1

如果當前執行緒還在持有讀鎖,就會走watchdog的邏輯了

給lockName的key續約,設定過期時間為leaseTime

獲取lockName key中的所有的屬性,並遍歷

值為數字的key屬性進行處理

會遍歷給 {lockName}:uuid:threadId:rwlock_timeout:i 續約leaseTime,續約成功則返回1

3. 加可重入讀鎖

在加鎖的狀態下,現在同一執行緒又來加讀鎖了,也就是可重入鎖

// KEY[1] = lockName// KEY[2] = {lockName}:uuid:threadId:rwlock_timeout// ARVG[1] = leaseTime// ARVG[2] = uuid:threadId// ARVG[3] = uuid:threadId:write// 如果已經是加了讀鎖的了或者是加寫鎖的人是自己if (mode == ‘read’) // 或者是 加了寫鎖且lockName的uuid:threadId:write屬性是存在的 or (mode == ‘write’ and redis。call(‘hexists’, KEYS[1], ARGV[3]) == 1) then 加鎖數量增加1 // incrby lockName uuid:threadId 1 local ind = redis。call(‘hincrby’, KEYS[1], ARGV[2], 1); local key = KEYS[2] 。。 ‘:’ 。。 ind; redis。call(‘set’, key, 1); redis。call(‘pexpire’, key, ARGV[1]); local remainTime = redis。call(‘pttl’, KEYS[1]); redis。call(‘pexpire’, KEYS[1], math。max(remainTime, ARGV[1])); return nil;end;return redis。call(‘pttl’, KEYS[1]);

在加讀鎖的程式碼中,還有下面一個分支判斷

如果資源已經被加了讀鎖,或者是被當前執行緒加了寫鎖

自增一下lockName的屬性uuid:threadId值,表示加鎖的數量+1

設定key為 {lockName}:uuid:threadId:rwlock_timeout:i值為1

設定{lockName}:uuid:threadId:rwlock_timeout:i的過期時間為leaseTime

獲取lockName的ttl,設定lockName的過期時間為 ttl和leaseTime之間的更大值,

所以有可能存在ttl是比leaseTime大的情況

,通常處於加了可重入寫鎖

如果加鎖成功就返回null

4. 加寫鎖

寫鎖也是一樣的,在redis的資料結構上,嘗試加了寫鎖

// KEY[1] lockName// ARGV[1] leaseTime// ARGV[2] uuid:threadId:write// hget lockName modelocal mode = redis。call(‘hget’, KEYS[1], ‘mode’);if (mode == false) then // hset lockName mode write redis。call(‘hset’, KEYS[1], ‘mode’, ‘write’); // hset lockName uuid:threadId:write 1 redis。call(‘hset’, KEYS[1], ARGV[2], 1); // pexpire lockName leaseTime redis。call(‘pexpire’, KEYS[1], ARGV[1]); return nil;end;

1。先去redis中獲取lockName的mode屬性

2。如果沒有加過鎖,那麼這個mode就是不存在的

3。設定lockName key屬性mode的值為write

{ “lockName”: { “mode”: “write” }}

4。設定lockName一個屬性uuid:threadId:write值為1

{ “lockName”: { “mode”: “write”, “uuid:threadId:write”: 1 }}

5。設定lockName的過期時間為leaseTime

6。如果加鎖成功就返回null

5. 寫鎖watchdog

判斷lockName中存在屬性uuid:threadId

設定過期時間為leaseTime

6. 可重入加寫鎖

// KEY[1] lockName// ARGV[1] leaseTime// ARGV[2] uuid:threadId:writeif (mode == ‘write’) then // hexists lockName uuid:threadId:write if (redis。call(‘hexists’, KEYS[1], ARGV[2]) == 1) then // hincrby lockName uuid:threadId:write 1 redis。call(‘hincrby’, KEYS[1], ARGV[2], 1); // pttl lockName local currentExpire = redis。call(‘pttl’, KEYS[1]); // pexpire lockName ttl+leaseTime redis。call(‘pexpire’, KEYS[1], currentExpire + ARGV[1]); return nil; end;end;return redis。call(‘pttl’, KEYS[1]);

同一執行緒來加寫鎖,表明這個是可重入寫鎖

獲取lockName的mode屬性,如果等於write就是已經處於寫鎖獲取的狀態了,再透過判斷lockName中的屬性uuid:threadId:write是否存在來判斷是否是可重入鎖

會先將lockName的屬性uuid:threadId:write的值+1

獲取lockName的ttl

設定lockName的過期時間為:ttl+leaseTime,時間疊加

如果加鎖成功就返回null

7. 其他執行緒加寫鎖

加寫鎖的lua邏輯裡面,是有兩個判斷的,一個是判斷是否lockName被加寫鎖、一個是加鎖的執行緒是當前執行緒

才會走到對應的邏輯裡,否則就會直接返回lockname的ttl

8. 可重入加讀鎖

與上面的

3

的邏輯是一致的

9. 加讀鎖

加讀鎖的時候也會判斷,如果沒有加鎖,就會直接加讀鎖,如果加鎖了,就會判斷鎖是不是讀鎖,或者加寫鎖的是自己的執行緒

10. 加了讀鎖之後,加寫鎖

毫無波瀾,直接返回了ttl

1. 釋放讀鎖

// KEY[1] lockName// KEY[2] redisson_rwlock:{lockName}// KEY[3] {lockName}:uuid:threadId:rwlock_timeout// KEY[4] {lockName}// ARVG[1] 0// ARVG[2] uuid:threadId// hget lockName modelocal mode = redis。call(‘hget’, KEYS[1], ‘mode’); if (mode == false) then // publish redisson_rwlock:{lockName} 0 redis。call(‘publish’, KEYS[2], ARGV[1]); return 1;end; // hexists lockName uuid:threadIdlocal lockExists = redis。call(‘hexists’, KEYS[1], ARGV[2]);if (lockExists == 0) then return nil;end;// hincrby lockName uuid:threadId -1local counter = redis。call(‘hincrby’, KEYS[1], ARGV[2], -1); if (counter == 0) then // hdel lockName uuid:threadId redis。call(‘hdel’, KEYS[1], ARGV[2]);end;// del {lockName}:uuid:threadId:rwlock_timeout:(counter+1)redis。call(‘del’, KEYS[3] 。。 ‘:’ 。。 (counter+1)); // hlen lockName > 1if (redis。call(‘hlen’, KEYS[1]) > 1) then local maxRemainTime = -3; // hkeys lockName local keys = redis。call(‘hkeys’, KEYS[1]); for n, key in ipairs(keys) do // hget lockName keys counter = tonumber(redis。call(‘hget’, KEYS[1], key)); if type(counter) == ‘number’ then // 遍歷獲取最大的ttl for i=counter, 1, -1 do // pttl {lockName}:key:rwlock_timeout:i local remainTime = redis。call(‘pttl’, KEYS[4] 。。 ‘:’ 。。 key 。。 ‘:rwlock_timeout:’ 。。 i); // maxRemainTime = math。max(remainTime, maxRemainTime); end; end; end; if maxRemainTime > 0 then // pexpire lockName maxRemainTime redis。call(‘pexpire’, KEYS[1], maxRemainTime); return 0; end; if mode == ‘write’ then return 0; end;end;redis。call(‘del’, KEYS[1]);redis。call(‘publish’, KEYS[2], ARGV[1]);return 1;

從redis中獲取lockName的mode屬性,如果不存在就表示已經沒有人持有鎖了,直接返回1

判斷是否存在lockName中是否存在uuid:threadId屬性,不存在直接返回null

將lockName中的uuid:threadId屬性值-1,如果此時發現屬性值已經為0了,就直接刪除掉uuid:threadId的屬性

刪除對應的另一個key:{lockName}:uuid:threadId:rwlock_timeout:(counter+1)

獲取lockName裡面所有的屬性,獲取keys——> {lockName}:uuid:threadId:rwlock_timeout:i中的最大ttl

設定lockName的過期時間為最大的ttl,返回0,表示釋放鎖成功

如果已經被寫鎖持有了,就返回0,表示釋放鎖成功

其他情況將lockName key刪除掉

2. 釋放寫鎖

// KEY[1] = lockName// KEY[2] = redisson_rwlock:lockName// ARVG[1] = 0 // LockPubSub。READ_UNLOCK_MESSAGE// ARVG[2] = leaseTime// ARVG[3] = uuid:threadId:write// 獲取並校驗modelocal mode = redis。call(‘hget’, KEYS[1], ‘mode’); if (mode == false) then // 不存在,直接返回1 redis。call(‘publish’, KEYS[2], ARGV[1]); return 1;end;// 當前加鎖為write鎖if (mode == ‘write’) then // 判斷是否存在當前執行緒加的寫鎖 uuid:threadId:write local lockExists = redis。call(‘hexists’, KEYS[1], ARGV[3]); if (lockExists == 0) then // 不存在直接返回null,這裡的意思就是不是自己加的寫鎖,不能釋放 return nil; else // uuid:threadId:write值-1 local counter = redis。call(‘hincrby’, KEYS[1], ARGV[3], -1); if (counter > 0) then // 如果有可重入鎖,就重置一下過期時間 redis。call(‘pexpire’, KEYS[1], ARGV[2]); return 0; else // 刪除uuid:threadId:write屬性 redis。call(‘hdel’, KEYS[1], ARGV[3]); // 判斷了一下是不是隻有一個寫鎖持有 if (redis。call(‘hlen’, KEYS[1]) == 1) then // 刪除key redis。call(‘del’, KEYS[1]); redis。call(‘publish’, KEYS[2], ARGV[1]); else // 表示有讀鎖,就轉成讀鎖mode redis。call(‘hset’, KEYS[1], ‘mode’, ‘read’); end; return 1; end; end;end;return nil;

三、思考

讀鎖

加讀鎖的時候,實際是產出了兩個redis key,一個是lockName,一個是{lockName}:uuid:threadId:rw_timeout:1的key,同時如果有其他執行緒再來加讀鎖的話,會持續遞增這個資料

寫鎖

加寫鎖就是一個鎖,但是他的屬性值變了,是uuid:threadId:write,但是同時,可能被當前執行緒獲取到讀鎖

作者:靜鑫下

連結:https://juejin。cn/post/7038514931909525534

分散式鎖:Redisson原始碼解析-ReadWriteLock