柚子快報邀請碼778899分享:數(shù)據(jù)庫 Redis-分布式鎖
Redis-分布式鎖
如何使用分布式鎖
正常在一個java服務中使用sync鎖或lock鎖完全可以滿足線程安全問題的,但是在部署集群的情況下,不同的jvm不能鎖同一個方法,因此需要分布式鎖用來保護線程安全問題。
分布式鎖實現(xiàn)
常見的分布式鎖解決方案:
Mysql:自帶悲觀鎖,但是不太好維護redis:利用setnx實現(xiàn)互斥,操作方便,推薦使用zookeeper:利用節(jié)點實現(xiàn)互斥
本章主要采用redis的方式進行實現(xiàn)
public interface ILock {
/**
* 分布式-互斥鎖
*/
boolean tryLock(String name, Long time, TimeUnit unit);
/**
* 分布式-釋放互斥鎖
*/
void unLock(String name);
}
/**
* 分布式鎖實現(xiàn)
*/
@Component
public class DistributedLock implements ILock{
@Resource
private StringRedisTemplate stringRedisTemplate;
/** 分布式鎖key */
private final String DISTRIBUTED_LOCK = "distributed_lock:";
/**
* 分布式互斥鎖
*/
@Override
public boolean tryLock(String name, Long time, TimeUnit unit) {
// value
String value = Thread.currentThread().getId() + "";
// key
String key = DISTRIBUTED_LOCK + name;
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, value, time, unit);
// 防止自動拆箱空指針
return BooleanUtil.isTrue(aBoolean);
}
/**
* 分布式釋放鎖
*/
@Override
public void unLock(String name) {
String key = DISTRIBUTED_LOCK + name;
stringRedisTemplate.delete(key);
}
}
分布式鎖誤刪問題
在設置互斥鎖的時候為了解決redis宕機導致互斥鎖永久失效的情況下,加了一個過期時間。此時如果緩存重建的時間比過期時間更長,會導致多個線程釋放不同的鎖資源導致分布式鎖誤刪問題。 解決誤刪問題:
需要在獲取鎖時存入線程表示(uuid + 線程id)的方式在釋放鎖時需要先獲取鎖中的線程標識,判斷是否與當前線程標識一致
如果一致則釋放鎖如果不一致則不釋放鎖
更新代碼:
@Component
public class DistributedLock implements ILock{
@Resource
private StringRedisTemplate stringRedisTemplate;
/** 分布式鎖key */
private final String DISTRIBUTED_LOCK = "distributed_lock:";
/** UUID */
private String uuid = UUID.randomUUID(true).toString();
/**
* 分布式互斥鎖
*/
@Override
public boolean tryLock(String name, Long time, TimeUnit unit) {
// value
String value = uuid + Thread.currentThread().getId();
// key
String key = DISTRIBUTED_LOCK + name;
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, value, time, unit);
// 防止自動拆箱空指針
return BooleanUtil.isTrue(aBoolean);
}
/**
* 分布式釋放鎖
*/
@Override
public void unLock(String name) {
String key = DISTRIBUTED_LOCK + name;
String value = uuid + Thread.currentThread().getId();
// 獲取互斥鎖中的值
String redisValue = stringRedisTemplate.opsForValue().get(key);
if (StringUtils.equals(value,redisValue)){
stringRedisTemplate.delete(key);
}
}
}
Redisson入門
正常使用setnx實現(xiàn)分布式鎖存在以下幾種問題
不可重入鎖:同一現(xiàn)成無法多次獲取同一把鎖不可重試:獲取鎖只嘗試一次就返回,無法重試超時釋放:業(yè)務執(zhí)行耗時較長,會導致鎖釋放主從一致性:集群的情況下主節(jié)點宕機后同步數(shù)據(jù)過程種,導致鎖失效
Redisson 是一個 Java 高級 Redis 客戶端,提供了基于 Redis 的分布式和可擴展的 Java 數(shù)據(jù)結構,如并發(fā)集合(Concurrent Collections)、同步器(Synchronizers)、分布式服務(Distributed Services)等。Redisson 構建于 Jedis 之上,旨在簡化 Redis 的使用,尤其對于分布式環(huán)境中的應用程序而言,它提供了一種易于使用的 API 來處理 Redis 中的數(shù)據(jù),并實現(xiàn)了多種分布式鎖和其他高級功能。Redisson底層采用的是Netty 框架
案例:每個用戶對一件商品只能下一單。
配置文件
redisson:
# redis key前綴
keyPrefix:
# 線程池數(shù)量
threads: 4
# Netty線程池數(shù)量
nettyThreads: 8
# 單節(jié)點配置
singleServerConfig:
# 客戶端名稱
clientName: ${ruoyi.name}
# 最小空閑連接數(shù)
connectionMinimumIdleSize: 8
# 連接池大小
connectionPoolSize: 32
# 連接空閑超時,單位:毫秒
idleConnectionTimeout: 10000
# 命令等待超時,單位:毫秒
timeout: 3000
# 發(fā)布和訂閱連接池大小
subscriptionConnectionPoolSize: 50
/**
* Redisson 配置屬性
*
*/
@Data
@ConfigurationProperties(prefix = "redisson")
public class RedissonProperties {
/**
* redis緩存key前綴
*/
private String keyPrefix;
/**
* 線程池數(shù)量,默認值 = 當前處理核數(shù)量 * 2
*/
private int threads;
/**
* Netty線程池數(shù)量,默認值 = 當前處理核數(shù)量 * 2
*/
private int nettyThreads;
/**
* 單機服務配置
*/
private SingleServerConfig singleServerConfig;
/**
* 集群服務配置
*/
private ClusterServersConfig clusterServersConfig;
@Data
@NoArgsConstructor
public static class SingleServerConfig {
/**
* 客戶端名稱
*/
private String clientName;
/**
* 最小空閑連接數(shù)
*/
private int connectionMinimumIdleSize;
/**
* 連接池大小
*/
private int connectionPoolSize;
/**
* 連接空閑超時,單位:毫秒
*/
private int idleConnectionTimeout;
/**
* 命令等待超時,單位:毫秒
*/
private int timeout;
/**
* 發(fā)布和訂閱連接池大小
*/
private int subscriptionConnectionPoolSize;
}
@Data
@NoArgsConstructor
public static class ClusterServersConfig {
/**
* 客戶端名稱
*/
private String clientName;
/**
* master最小空閑連接數(shù)
*/
private int masterConnectionMinimumIdleSize;
/**
* master連接池大小
*/
private int masterConnectionPoolSize;
/**
* slave最小空閑連接數(shù)
*/
private int slaveConnectionMinimumIdleSize;
/**
* slave連接池大小
*/
private int slaveConnectionPoolSize;
/**
* 連接空閑超時,單位:毫秒
*/
private int idleConnectionTimeout;
/**
* 命令等待超時,單位:毫秒
*/
private int timeout;
/**
* 發(fā)布和訂閱連接池大小
*/
private int subscriptionConnectionPoolSize;
/**
* 讀取模式
*/
private ReadMode readMode;
/**
* 訂閱模式
*/
private SubscriptionMode subscriptionMode;
}
}
/**
* redis配置
*
*/
@Slf4j
@AutoConfiguration
@EnableCaching
@EnableConfigurationProperties(RedissonProperties.class)
public class RedisConfig {
@Autowired
private RedissonProperties redissonProperties;
@Autowired
private ObjectMapper objectMapper;
@Bean
public RedissonAutoConfigurationCustomizer redissonCustomizer() {
return config -> {
ObjectMapper om = objectMapper.copy();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 指定序列化輸入的類型,類必須是非final修飾的。序列化時將對象全類名一起保存下來
om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
TypedJsonJacksonCodec jsonCodec = new TypedJsonJacksonCodec(Object.class, om);
// 組合序列化 key 使用 String 內容使用通用 json 格式
CompositeCodec codec = new CompositeCodec(StringCodec.INSTANCE, jsonCodec, jsonCodec);
config.setThreads(redissonProperties.getThreads())
.setNettyThreads(redissonProperties.getNettyThreads())
// 緩存 Lua 腳本 減少網(wǎng)絡傳輸(redisson 大部分的功能都是基于 Lua 腳本實現(xiàn))
.setUseScriptCache(true)
.setCodec(codec);
RedissonProperties.SingleServerConfig singleServerConfig = redissonProperties.getSingleServerConfig();
if (ObjectUtil.isNotNull(singleServerConfig)) {
// 使用單機模式
config.useSingleServer()
//設置redis key前綴
.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix()))
.setTimeout(singleServerConfig.getTimeout())
.setClientName(singleServerConfig.getClientName())
.setIdleConnectionTimeout(singleServerConfig.getIdleConnectionTimeout())
.setSubscriptionConnectionPoolSize(singleServerConfig.getSubscriptionConnectionPoolSize())
.setConnectionMinimumIdleSize(singleServerConfig.getConnectionMinimumIdleSize())
.setConnectionPoolSize(singleServerConfig.getConnectionPoolSize());
}
// 集群配置方式 參考下方注釋
RedissonProperties.ClusterServersConfig clusterServersConfig = redissonProperties.getClusterServersConfig();
if (ObjectUtil.isNotNull(clusterServersConfig)) {
config.useClusterServers()
//設置redis key前綴
.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix()))
.setTimeout(clusterServersConfig.getTimeout())
.setClientName(clusterServersConfig.getClientName())
.setIdleConnectionTimeout(clusterServersConfig.getIdleConnectionTimeout())
.setSubscriptionConnectionPoolSize(clusterServersConfig.getSubscriptionConnectionPoolSize())
.setMasterConnectionMinimumIdleSize(clusterServersConfig.getMasterConnectionMinimumIdleSize())
.setMasterConnectionPoolSize(clusterServersConfig.getMasterConnectionPoolSize())
.setSlaveConnectionMinimumIdleSize(clusterServersConfig.getSlaveConnectionMinimumIdleSize())
.setSlaveConnectionPoolSize(clusterServersConfig.getSlaveConnectionPoolSize())
.setReadMode(clusterServersConfig.getReadMode())
.setSubscriptionMode(clusterServersConfig.getSubscriptionMode());
}
log.info("初始化 redis 配置");
};
}
@RequiredArgsConstructor
@Service
public class BookOrderServiceImpl implements IBookOrderService {
private final BookOrderMapper baseMapper;
private final SysUserMapper sysUserMapper;
private final BooksMapper booksMapper;
private final BookOrderDetailMapper bookOrderDetailMapper;
/** 自定義分布式鎖 */
private final DistributedLock distributedLock;
/** redission */
private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
/**
* 模擬庫存扣減并發(fā)問題
*/
@Transactional(rollbackFor = Exception.class)
@Override
public void inventory(String bookId,Long userId) {
// 一人一單校驗
Long aLong = bookOrderDetailMapper.selectCount(
Wrappers.lambdaQuery(BookOrderDetail.class).eq(BookOrderDetail::getNumber, userId)
.eq(BookOrderDetail::getBookId,bookId)
);
if (aLong > 0){
throw new ServiceException("下單失敗");
}
// 自定義獲取鎖
// boolean piker = distributedLock.tryLock("PIKER", 10L, TimeUnit.SECONDS);
// redisClient分布式鎖
RLock lock = CLIENT.getLock("lock:order:");
// 默認失敗不等待鎖時間,鎖過期時間30秒
boolean piker = lock.tryLock();
if (piker){
try {
// 訂單業(yè)務
placingOrder(bookId, userId);
}finally {
// 自定義釋放鎖
// distributedLock.unLock("PIKER");
// redisson 釋放鎖
lock.unlock();
}
}
}
/**
* 業(yè)務操作
*/
private void placingOrder(String bookId, Long userId) {
// 1.減少庫存
Books books = booksMapper.selectById(bookId);
books.setStockQuantity(books.getStockQuantity() - 1);
booksMapper.updateById(books);
// 2.增加訂單
BookOrderDetail bookOrder = new BookOrderDetail();
bookOrder.setBookId(Long.parseLong(bookId));
bookOrder.setNumber(userId.intValue());
bookOrderDetailMapper.insert(bookOrder);
}
}
Redisson-分布式鎖實現(xiàn)原理
1.可重入鎖
方法1{
獲取鎖
調用方法2
}
方法2{
獲取鎖
}
以上這種情況下使用自定義的setnx方式就會造成死鎖的情況,比較經(jīng)典的重入鎖。
Rdisson使用Lua腳本來實現(xiàn)可重入鎖的。
2.重試機制,超時釋放
重試機制:在設置互斥鎖時有兩個線程A,B。A線程先獲取鎖資源,之后B在獲取鎖就會一直失敗,因為鎖的互斥性,沒有重試的機制。
超時釋放:給鎖設置一個過期時間,防止redis宕機情況下鎖一直沒有辦法被釋放導致死鎖情況,或者因為業(yè)務原因導致緩存重建時間大于鎖過期時間導致數(shù)據(jù)丟失
注意:redisson不同版本的代碼不同,但是整體流程是大差不大的,下面是結合黑馬程序猿老師結合總結的流程圖。 如果自己設置失效時間的話,鎖過期時間就不是-1因此就不會觸發(fā)看門狗機制了。
獲取鎖:
釋放鎖:
有了以上的機制可以實現(xiàn):我有三個線程 A,B 設置等待時間3秒,線程A先獲取到鎖,由于業(yè)務原因進行阻塞,此時線程2開始獲取鎖。線程A業(yè)務執(zhí)行了4秒,那么首先線程2獲取鎖失敗。如果線程A執(zhí)行業(yè)務在3秒內完成,那么線程2可以成功獲取鎖。
柚子快報邀請碼778899分享:數(shù)據(jù)庫 Redis-分布式鎖
文章鏈接
本文內容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉載請注明,如有侵權,聯(lián)系刪除。