切換語言為:簡體
基於canal與Redis釋出訂閱構建讓應用起飛的多級快取方案及實踐

基於canal與Redis釋出訂閱構建讓應用起飛的多級快取方案及實踐

  • 爱糖宝
  • 2024-11-16
  • 2032
  • 0
  • 0

一、背景

       某系統作為公司產品矩陣底座,每天面對數十億級流量請求。在覈心介面全鏈路壓測時發現需多次透過網路請求分散式快取,影響介面耗時。快取、熔斷、限流作為應對高併發系統的三板斧,其中熔斷限流作為系統的自我保護機制,而快取作為介面效能提升利器備受關注。

二、問題分析

       分散式記憶體快取讀取相對訪問db磁碟至少是數量級領先,但仍然存在網路請求,於是引入本地快取形成本地快取、分散式快取與db形成三級儲存結構。但任何事情都具有兩面性,引入快取後同一份資料會儲存在不同的地方,帶來資料一致性的問題。

     目前資料一致性解決方案有強一致、弱一致及最終一致,強一致的需要額外的手段保障實現成本高,最終一致為為弱一致性的特例。結合當前業務場景及實現成本,此處選擇最終一致性作為當前系統的一致性解決方案,放棄強一致不代表不追求一致性,會盡可能追求資料的一致性。

三、本地快取選型

  1. LinkedHashMap

  • 優點:jdk內建數據結構,無需引入其他元件,執行緒安全;

  • 缺點:缺少容量限制、無淘汰策略,需要自己開發;

  1. Guava

  • 優點:Guava作為Google團隊開源的一款Java核心增強庫,在效能和穩定性上都有保障,同時提供容量限制、兩種淘汰策略(LRU和LFU);

  • 缺點:淘汰策略不夠完善,使用任何一種都存在一定問題。比如LRU在面對偶爾批次重新整理資料時很容易將快取內容擠出,帶來擊穿風險。LFU在面對突發流量時顯得力不從心,需要時間累積使用頻率。

  1. Caffeine

  • 優點:caffeine以Guava為基礎改進而來,站在巨人的肩膀上有更高的起點。在LFU基礎上改進形成W-TinyLFU淘汰演算法,在一定程度上解決了LFU的缺點,讓caffeine有更高的快取命中效率; 基於canal與Redis釋出訂閱構建讓應用起飛的多級快取方案及實踐

        根據官方的測試資料來看,Caffeine在效能上優於其他幾種元件,因此此處選用Caffeine作為本地快取,Caffeine過期策略:

  • expireAfterAccess: 在指定的過期時間內沒有讀寫,再次訪問時會將資料判斷為失效。如果一直訪問則永不過期,不能單獨使用;

  • expireAfterWrite: 在指定的過期時間內沒有再次寫入,再次訪問時會將資料判斷為失效,此時請求會阻塞等待載入新資料返回;

  • refreshAfterWrite: 在指定的過期時間後訪問時,會再次載入重新整理快取資料,在重新整理任務未完成之前,其他執行緒返回舊值;

上述任何一種策略單獨使用都存在一定的問題,下面嘗試將兩者組合進行分析:

組合1:expireAfterAccess和expireAfterWrite 缺點:快取過期後,由一個請求去執行後端查詢,其他請求都在阻塞等待結果返回。如果同時有大量的請求阻塞,對系統吞吐量有一定影響。

組合2:expireAfterAccess和refreshAfterWrite 缺點:如果某個key一直被訪問,則會一直不過期。

組合3:expireAfterWrite和refreshAfterWrite 將組合3兩個過期時間比例設成3:1,可以在保證資料失效的同時可以防止大量請求因資料過期造成阻塞,因此選用組合3作為本地快取的過期策略。

四、本地快取選型

       常見的分散式快取有Memcache、Redis等,但Redis相比Memcache有更豐富的功能特性(持久化、釋出訂閱、主從複製等), 因此選用Redis作為分散式快取。

五、MySQL與Redis資料同步

1、先更新MySQL再更新Redis
  • 缺點:A、在高併發情況下,假設請求A先寫MySQL然後卡頓,隨後請求B寫MySQL再更新Redis,請求A最後再更新Redis,會存在舊值覆蓋新值;B、寫完資料庫立即寫快取,可能會存在浪費系統資源;

  • 是否推薦使用:不推薦

2、先更新Redis再更新MySQL
  • 缺點:更新Redis成功但寫MySQL失敗;

  • 是否推薦使用:不推薦

3、先刪除Redis再寫MySQL
  • 缺點:在高併發情況下,假設請求A先刪除Redis然後卡頓,請求B請求Redis沒值則讀取MySQL,再快取在Redis,然後請求A再寫MySQL,則此時快取中的值為髒資料。

  • 解決辦法:延時雙刪

  • 是否推薦使用:不推薦

4、先寫MySQL再刪除Redis
  • 缺點1:假設寫請求A先來,請求A寫MySQL然後卡頓沒來得及刪除快取,請求B讀取快取會是舊值,隨後請求A就將快取刪除,B讀取了一次舊值,可以接受;

  • 缺點2

  • 快取到過期時間會自動失效;

  • 請求A查詢快取,發快取中沒有資料,查詢資料庫的舊值,但由於網路原因卡頓了,沒有來得及更新快取;

  • 請求B寫資料庫,接著刪除了快取;

  • 請求A更新舊值到快取中;

    上述情形出現條件需要滿足:快取剛好失效請求A查詢耗時比請求B更新資料庫耗時時間還要長,出現機率極小。

  • 是否推薦使用:推薦

5、利用Canal監聽MySQL binlog

        Canal是用Java開發的基於資料庫增量日誌解析,提供增量資料訂閱&消費的中介軟體。目前Canal支援MySQL的binlog解析,解析完成後才利用Canal Client來處理獲得的相關資料。 基於canal與Redis釋出訂閱構建讓應用起飛的多級快取方案及實踐       

綜上所述,前3種方案存在問題,第四種可以接受但是需在執行MySQL變更時需要操作Redis操作重新整理快取,存在耦合,因此選用方案5。

六、redis與本地快取同步

1、訊息佇列廣播模式

基於canal與Redis釋出訂閱構建讓應用起飛的多級快取方案及實踐         

當binlog有變更時推送訊息到訊息佇列,應用例項採用廣播模式消費,消費時搶鎖更新Redis快取,然後更新本地快取;

  • 優點:廣播機制實現本地快取更新;

  • 缺點:依賴分散式鎖、需要支援廣播的訊息佇列;

2、基於redis釋出訂閱

基於canal與Redis釋出訂閱構建讓應用起飛的多級快取方案及實踐         

當canal收到binlog變更時,將變更訊息推送到訊息佇列,應用例項以叢集模式消費後更新分散式快取,然後向Redis叢集釋出訊息。註冊訂閱的例項收到釋出訊息後更新本地快取(覆蓋或刪除本地快取),具體流程如上所示。

  • 缺點:當訂閱Redis叢集的例項比較多時,更新本地快取可能會存在時延;

  • 優點:輕量;

       目前訊息佇列技術棧選用的是Kafka,但Kafka無法支援廣播消費(RocketMQ支援廣播消費),同時廣播消費需要分散式鎖支援相對較重,因此此處選用Redis釋出訂閱模式。

七、方案實踐

1、canal部署

基於canal與Redis釋出訂閱構建讓應用起飛的多級快取方案及實踐 此處在雲環境搭建canal服務,示意圖如上:

canal配置binlog監聽

anal.instance.master.address:資料庫db連線;

canal.instance.dbUsername:連線資料庫賬號;

canal.instance.dbPassword:連線資料庫密碼;

canal.instance.filter.regex:監聽庫中哪些表格的正規表示式;

canal.mq.topic:傳送訊息佇列的主題;

2、消費kafka更新分散式快取及釋出變更:
  • 消費kafka:

 public void onMessage(List<String> messages) throws KafkaConsumeRetryException {
        if (CollectionUtils.isEmpty(messages)) {
            return;
        }
        log.info("[Binlog] receive msg from bin log {}", messages);
        for (String msg : messages) {
            BinlogKafkaDTO binlogKafkaDTO = JSONObject.parseObject(msg, BinlogKafkaDTO.class);
            //省略非關鍵程式碼
            ICouponCacheHandler iCouponCacheHandler = handleMap.get(binlogKafkaDTO.getTable());
            if (iCouponCacheHandler == null) {
                log.error("[Binlog] no handler find msg:{}", binlogKafkaDTO);
                continue;
            }
			//判斷是否為插入還是更新,刪除需走另外的邏輯
            boolean insertOrUpdate = SfStrUtil.equals(binlogKafkaDTO.getType(), CacheConstant.EventType.INSERT) || SfStrUtil.equals(binlogKafkaDTO.getType(), CacheConstant.EventType.UPDATE);
            iCouponCacheHandler.updateCache(multiLevelCacheUtil, binlogKafkaDTO, insertOrUpdate);
        }
    }
    
  default void updateCache(MultiLevelCacheUtil multiLevelCacheUtil, BinlogKafkaDTO binlogKafkaDTO, Boolean insertOrUpdate) {
        JSONArray jsonArray = binlogKafkaDTO.getData();
        for (int i = 0, num = jsonArray.size(); i < num; ++i) {
            BinlogBaseInfo obj = JSONObject.parseObject(JSONObject.toJSONString(jsonArray.get(i)), BinlogBaseInfo.class);
            String id = obj.getId();
            if (insertOrUpdate) {
                multiLevelCacheUtil.putForCache(String.format(getCachePrefix(), id), JSONObject.toJSONString(obj), CacheConstant.EXPIRE_SIZE);
            } else {
                //delete
                multiLevelCacheUtil.deleteCache(String.format(getCachePrefix(), id));
            }
        }
    }

  • 更新分散式快取及釋出變更:

public <T> void putForCache(String key, T value, long expireTime) {
        if (expireTime <= 0) {
            redisUtil.set(key, value);
        } else {
            redisUtil.set(key, value, expireTime);
        }

        redisUtil.getRedisTemplate().convertAndSend(key, value instanceof String ? value : SfJsonUtil.toJsonStr(value));
    }
 
   public void deleteCache(String key) {
        try {
            redisUtil.unlink(key);
            redisUtil.getRedisTemplate().convertAndSend(key, CacheConstant.DELETE_FLAG);
        } catch (Exception e) {
            log.error("deleteCache error key:{}", key, e);
        }
    }

3、向redis訂閱註冊及回撥處理
  • 監聽回撥基類

public abstract class AbstractRedisMessageListener implements MessageListener {
private CaffeineCacheUtil caffeineCacheUtil;
public AbstractRedisMessageListener(CaffeineCacheUtil caffeineCacheUtil) {
this.caffeineCacheUtil = caffeineCacheUtil;
}
@Override
public void onMessage(Message message, byte[] pattern) {
String data = LocalCacheConstant.VALUE_SERIALIZER.deserialize(message.getBody());
String channel = new String(message.getChannel());
//如果是刪除redis快取,則清除本地快取 否則更新本地快取或者插入 則直接覆蓋
if (SfStrUtil.equals(LocalCacheConstant.DELETE_FLAG, data)) {
caffeineCacheUtil.evictCache(getCacheName(), channel);
} else {
caffeineCacheUtil.putCache(getCacheName(), channel, data);
}
}
public abstract Set<String> getTopics();

public abstract SubscribeType getSubscribeType();

public abstract String getCacheName();
}

  • 渠道黑名單監聽實現如下:

@Slf4j
@Component
public class BlackWaybillSourceRedisMessageListener extends AbstractRedisMessageListener {
    public BlackIpRedisMessageListener(CaffeineCacheUtil caffeineCacheUtil) {
        super(caffeineCacheUtil);
    }
    @Override
    public Set<String> getTopics() {
        return Sets.newHashSet(LocalCacheConstant.RedisMessageTopic.BlackWaybillSource);
    }

    @Override
    public SubscribeType getSubscribeType() {
        return SubscribeType.CHANNEL_TYPE;
    }
    @Override
    public String getCacheName() {
        return LocalCacheConstant.CaffineCacheName.BlackWaybillSource;
    }
}

此處以將所有渠道黑名單作為集合當成一個key,則以頻道註冊。比如需快取訂單,需要以訂單號維度快取訂單,則以模式註冊。

  • 向redis訂閱註冊:

@Configuration
public class RedisMessageListenerConfig {


    @Bean
    @ConditionalOnBean(AbstractRedisMessageListener.class)
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory,List<AbstractRedisMessageListener> redisMessageListeners) {

        RedisMessageListenerContainer redisMessageListenercontainer = new RedisMessageListenerContainer();
        redisMessageListenercontainer.setConnectionFactory(factory);
        for (AbstractRedisMessageListener redisMessageListener : redisMessageListeners) {
            //判斷是模式註冊還是頻道訂閱
            boolean channelType = ObjectUtil.equals(SubscribeType.CHANNEL_TYPE, redisMessageListener.getSubscribeType());
            redisMessageListener.getTopics().stream().forEach(channel ->
                    redisMessageListenercontainer.addMessageListener(redisMessageListener, channelType ? new ChannelTopic(channel):new PatternTopic(channel)));
        }
        log.info("Register listener for redisMessageListenerContainer num:{}", redisMessageListeners.size());
        return redisMessageListenercontainer;
    }
}

4、快取建立
  • 利用spring cacheManager批次建立快取:

public CacheManager cacheManagerWithCaffeine() {
        CaffeineCacheManager cacheManager = new CaffeineCacheManager();
        Caffeine caffeine = Caffeine.newBuilder()
            //cache的初始容量值
            .initialCapacity(StringUtils.isEmpty(initCapacity)?100:Integer.parseInt(initCapacity))
            //maximumSize用來控制cache的最大快取數量,maximumSize和maximumWeight(最大權重)不可以同時使用,
            .maximumSize(StringUtils.isEmpty(maxSize)?1000:Long.parseLong(maxSize))
            //建立或更新之後多久重新整理,需要設定cacheLoader
            .refreshAfterWrite(StringUtils.isEmpty(refreshAfterWrite)?10:Long.parseLong(refreshAfterWrite), TimeUnit.SECONDS);
        if (StringUtils.isNotBlank(expireAfterAccess)) {
            caffeine.expireAfterAccess(Long.parseLong(expireAfterAccess), TimeUnit.SECONDS);
        }
        if (StringUtils.isNotBlank(expireAfterWrite)) {
            caffeine.expireAfterWrite(Long.parseLong(expireAfterWrite), TimeUnit.SECONDS);
        }
        cacheManager.setCaffeine(caffeine);
        cacheManager.setCacheLoader(cacheLoader());
        if (StringUtils.isEmpty(cacheNames)){
            cacheNames = "userCache,commonCache";
        }
        // 根據名字可以建立多個cache,但是多個cache使用相同的策略
        cacheManager.setCacheNames(Arrays.asList(cacheNames.split(",")));
        // 是否允許值為空
        cacheManager.setAllowNullValues(false);
        return cacheManager;
    }

快取引數引數設定:

caffine配置

caffeine.cacheNames=waybillSource

caffeine.initCapacity=32

caffeine.maxSize=64

#寫入多久後過期

caffeine.expireAfterWrite=1800

#寫入多久後過期過,同時請求先返回舊資料然後再載入

caffeine.refreshAfterWrite=600

        批次建立caffeine簡單方便,但因追求通用性缺乏個性化定製.比如所有快取過期時間、容量都是一致的,此時可以採用caffeine手動建立快取進行深度定製,同樣以渠道黑名單為例。

  • 根據api手動構建快取:

@Component
public class BlackWaybillSourceCache {
    private LoadingCache<String, List< BlackWaybillSource>> cache = Caffeine.newBuilder()
            .refreshAfterWrite(10, TimeUnit.MINUTES)
            .expireAfterWrite(30, TimeUnit.MINUTES)
            .initialCapacity(2)
            .maximumSize(2)
            .build(new CacheLoader<String, List< BlackWaybillSource>>() {
                @Nullable
                @Override
                public List<WaybillSource> load(@Nonnull String key) throws Exception {
                     .......
                    return JSONObject.parseArray(jsonString, BlackWaybillSource.class);
                }
            });

    public List< BlackWaybillSource> queryBlackWaybillSource() {
        return cache.get(WAYBILL_SOUERCE);
    }

    public void put(String key,String value){
        cache.put(key,JSONObject.parseArray(value,BlackWaybillSource.class));
    }

    public void evict(String key){
        cache.invalidate(key);
    }
}

        此時BlackWaybillSourceRedisMessageListener需實現onMessage,更新及刪除分別刪除上面WaybillSourceCache的put和evict方法實現回撥更新。

5、效果

功能上線前後Redis叢集併發量及執行命令數對比: 基於canal與Redis釋出訂閱構建讓應用起飛的多級快取方案及實踐

叢集IP 每秒併發量 每分鐘命令數
xx.78.32 4737—>1896 263400—>113400
xx.78.35 6184—>1716 351060—>103680
xx.78.36 6262—>2203 354840—>123540
xx.78.39 5409—>1656 307560—>101640

      從redis執行併發量及命令數來看,本地快取攔截很多請求,降低Redis叢集的負載,提升響應速度。

  • 根據記憶體dump計算出本地快取佔用記憶體約為60M左右;

  • 核心介面耗時下降約為19%;

  • redis命令數及併發量降低67%;

八、總結

        經過分析選型,用redis與caffeine構建兩級快取,用canal解決db和redis之間的資料同步,用redis釋出訂閱解決分散式快取與本地快取之間的資料同步,構建近乎實時的兩級快取,介面效能得到約19%的提升,redis叢集負載下降一半以上。


0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.