- 工信部備案號 滇ICP備05000110號-1
- 滇公安備案 滇53010302000111
- 增值電信業(yè)務經營許可證 B1.B2-20181647、滇B1.B2-20190004
- 云南互聯(lián)網協(xié)會理事單位
- 安全聯(lián)盟認證網站身份V標記
- 域名注冊服務機構許可:滇D3-20230001
- 代理域名注冊服務機構:新網數(shù)碼
代碼分析事務消息
分布式事務模型圖如下:
我們先看事務消息客戶端的實現(xiàn)。即上圖的MQClient.
我們先看代碼的整體封裝。
下面代碼段做了兩件事:
1.本地數(shù)據庫寫入 2.事務消息客戶端發(fā)送消息。
@Transactional表示開啟事務。在注釋下,開啟一個新的Order。用OrderDao將數(shù)據寫入本地數(shù)據庫。然后調用transactionMsgClient.sendMsg將消息發(fā)出去(這是靜態(tài)方法調用,transactionMsgClient是一個類,sendMsg是它的方法)。這樣,本地數(shù)據庫寫入和發(fā)消息這兩件事,就是個原子事務,也就是說,兩件事要么一起成功,要么一起失敗。
上面代碼用到了MybatisTransactionMsgClient。MyBatis是一個Java持久化框架,它通過XML描述符或注解把對象與存儲過程或SQL語句關聯(lián)起來,映射成數(shù)據庫內對應的記錄。
上面提到過,transactionMsgClient.sendMsg是個類,這個類繼承了TransactionMsgClient。下面代碼中,transactionMsgClient.sendMsg調用了其父類的TransactionsendClient的sendMsg方法,寫事務消息表,并且發(fā)送消息。
我們接下來看sendMsg這個方法到底做了什么:
1、消息內容落數(shù)據庫 2、發(fā)送消息
下面代碼會先做一個判斷,在if字段里:con.getAutoCommit。也就是說,只有當沒有開啟自動commit的時候(有自動提交就破壞了事務的原子性),把信息寫在數(shù)據庫表里,然后構造一個messages,發(fā)消息。而發(fā)消息的方法是將消息放到消息隊列中。
在事務消息設計中,后臺發(fā)送消息隊列設計,如下圖所示:
參照上圖,我們可以看到后臺發(fā)送消息隊列有兩個:
SendMsg隊列的消息消費很快,基本上放進去很快就會被消費掉。這樣重試才能有效,否則一直重試,沒有意義。
下面代碼段的的作用:是發(fā)送消息時,從隊列里中取消息,看是否到期,將到期的消息取出來:
后臺優(yōu)先隊列的維護。
最后,事務消息表的設計;
代碼分析事務消息
我們知道,RocketMQ支持延時消息。我們先看一下延時消息的應用場景。
延時需求是在當前時間后的某一時間點觸發(fā)指定的業(yè)務邏輯或操作。
例如微信發(fā)消息,過一段時間沒發(fā)送成功的話,提示重新發(fā)送(如下圖的小紅點提示)。
訂單狀態(tài)流轉:如延時支付。京東上超過24小時的訂單會被自動取消。
實際上,我們在分布式事務的終極模型中,也用到了延時消息。
RocketMQ支持18個級別的延時等級:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 6h 12h。也就是說,消息延時發(fā)送有這18個級別。如果業(yè)務的延時消息需求與這18個級別不匹配,就需要自行基于RocketMQ進行二次開發(fā)。
接下來,我們看一下18個延時的實現(xiàn)原理。
RocketMQ的18個延時級別,每個級別對應一個Queue,根據Level參數(shù),將消息放到對應的18個隊列中的等級。每個隊列都對應了到時出隊。例如1s隊列就是1s出隊,2小時隊就是2小時出隊。消息出隊以后,當成正常消息投遞。然后被投遞到了消費隊列,被消費者消費掉。
提交成功!非常感謝您的反饋,我們會繼續(xù)努力做到更好!
這條文檔是否有幫助解決問題?
售前咨詢
售后咨詢
備案咨詢
二維碼
TOP