摘要:無(wú)證連接進(jìn)行異常記錄并關(guān)閉連接。離線消息檢測(cè)到上線立即推送這是消息推送需要實(shí)現(xiàn)的基本功能之一了,詳見代碼。主要功能協(xié)助進(jìn)行初始化,心跳包檢測(cè),斷線自動(dòng)重連消息推送的第二種方式在下篇中再編寫
消息重發(fā)中需要注意的問題
由于最近工作中接觸了比較多關(guān)閉消息推送以及異常重發(fā)機(jī)制的問題,終于得空總結(jié)一下經(jīng)驗(yàn)
目前接觸的消息推送分為兩種
主動(dòng)推送:一般為websocket建立長(zhǎng)連接實(shí)現(xiàn),此處網(wǎng)上多有各種實(shí)現(xiàn)方式。下面貼出本人結(jié)合實(shí)際應(yīng)用場(chǎng)景使用的長(zhǎng)連接方式。
websocket服務(wù)端代碼
import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import javax.annotation.PostConstruct; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint(value = "/websocket/{id}") @Component @Slf4j public class WebSocket { // 靜態(tài)變量,用來(lái)記錄當(dāng)前在線連接數(shù)。應(yīng)該把它設(shè)計(jì)成線程安全的。 private static int onlineCount = 0; // concurrent包的線程安全Set,用來(lái)存放每個(gè)客戶端對(duì)應(yīng)的MyWebSocket對(duì)象。 private static ConcurrentHashMapwebSocketSet = new ConcurrentHashMap<>(); // 保存允許建立連接的id private static List idList = Lists.newArrayList(); private String id = ""; /** * 這里使用AutoWired注入的bean會(huì)出現(xiàn)無(wú)法持續(xù)保存而出現(xiàn)null的情況。 * 具體原因暫時(shí)沒有深究,如果有需要時(shí),可以再init初始化方法中手動(dòng)將臨時(shí)的beanTmp類存入static常量中即可正常使用該bean類。 * @Autowired * private RedisCacheUtil redisTmp; * private static RedisCacheUtil redis; * */ // 與某個(gè)客戶端的連接會(huì)話,需要通過它來(lái)給客戶端發(fā)送數(shù)據(jù) private Session session; public void closeConn(String appId) { // 關(guān)閉連接 try { WebSocket socket = webSocketSet.get(appId); if (null != socket) { if (socket.session.isOpen()) { socket.session.close(); } } } catch (IOException e) { System.out.println("IO異常"); e.printStackTrace(); } idList.remove(appId); } /** * 連接/注冊(cè)時(shí)去重 */ public void conn(String appId) { // 去重 if (!idList.contains(appId)) { idList.add(appId); } } /** * 獲取注冊(cè)在websocket進(jìn)行連接的id */ public static List getIdList() { return idList; } /** * 初始化方法 * @author caoting * @date 2019年2月13日 */ @PostConstruct public void init() { try { /** * TODO 這里的設(shè)計(jì)是在項(xiàng)目啟動(dòng)時(shí)從DB或者緩存中獲取注冊(cè)了允許建立連接的id * 然后將獲取到的id存入內(nèi)存--idList * // 從數(shù)據(jù)庫(kù)獲取idList * List ids = wsIdsServiceTmp.selectList(null); */ // TODO 初始化時(shí)將剛注入的對(duì)象進(jìn)行靜態(tài)保存 // redis = redisTmp; } catch (Exception e) { // TODO 項(xiàng)目啟動(dòng)錯(cuò)誤信息 } } /** * 連接啟動(dòng)時(shí)查詢是否有滯留的新郵件提醒 * @param id * * @author caoting * @throws IOException * @date 2019年2月28日 */ private void selectOfflineMail(String id) throws IOException { // 查詢緩存中是否存在離線郵件消息 Jedis jedis = redis.getConnection(); try { List mails = jedis.lrange(Constant.MAIL_OFFLINE+id, 0, -1); if (CommomUtil.isNotEmpty(mails)) { for (String mailuuid : mails) { String mail = jedis.get(mailuuid); if (StringUtils.isNotEmpty(mail)) sendToUser(Constant.MESSAGE_MAIL + mail, id); Thread.sleep(1000); } // 發(fā)送完成從緩存中移除 jedis.del(Constant.MAIL_OFFLINE+id); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { jedis.close(); } } /** * 連接建立成功調(diào)用的方法 * @param id */ @OnOpen public void onOpen(@PathParam(value = "id") String id, Session session) { try { // 注:ws-admin是管理員內(nèi)部使用通道 不受監(jiān)控 謹(jǐn)慎使用 if (!id.contains(Constant.WS_ADMIN)) { this.session = session; this.id = id;//接收到發(fā)送消息的人員編號(hào) // 驗(yàn)證id是否在允許 if (idList.contains(id)) { // 判斷是否已存在相同id WebSocket socket = webSocketSet.get(id); if (socket == null) { webSocketSet.put(id, this); //加入set中 addOnlineCount(); // 在線數(shù)加1 this.sendMessage("Hello:::" + id); System.out.println("用戶"+id+"加入!當(dāng)前在線人數(shù)為" + getOnlineCount()); // 檢查是否存在離線推送消息 selectOfflineMail(id); } else { this.sendMessage(Constant.MESSAGE_ERROR+"連接id重復(fù)--連接即將關(guān)閉"); this.session.close(); } } else { // 查詢數(shù)據(jù)庫(kù)中是否存在數(shù)據(jù) WsIds wsIds = wsIdsService.selectByAppId(id); if ( null != wsIds ) { idList.add(id); webSocketSet.put(id, this); //加入set中 addOnlineCount(); // 在線數(shù)加1 this.sendMessage("Hello:::" + id); log.debug("用戶"+id+"加入!當(dāng)前在線人數(shù)為" + getOnlineCount()); // 檢查是否存在離線推送消息 selectOfflineMail(id); } else { // 關(guān)閉 this.sendMessage(Constant.MESSAGE_ERROR+"暫無(wú)連接權(quán)限,連接即將關(guān)閉,請(qǐng)確認(rèn)連接申請(qǐng)是否過期!"); this.session.close(); log.warn("有異常應(yīng)用嘗試與服務(wù)器進(jìn)行長(zhǎng)連接 使用id為:"+id); } } } else { this.session = session; this.id = id;//接收到發(fā)送消息的人員編號(hào) webSocketSet.put(id, this); //加入set中 addOnlineCount(); // 在線數(shù)加1 this.sendMessage("Hello:::" + id); log.debug("用戶"+id+"加入!當(dāng)前在線人數(shù)為" + getOnlineCount()); } } catch (IOException e) { e.printStackTrace(); } } /** * 連接關(guān)閉調(diào)用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this.id); // 從set中刪除 subOnlineCount(); // 在線數(shù)減1 log.debug("有一連接關(guān)閉!當(dāng)前在線人數(shù)為" + getOnlineCount()); } /** * 收到客戶端消息后調(diào)用的方法 * * @param message * 客戶端發(fā)送過來(lái)的消息 */ @OnMessage public void onMessage(String message, Session session) { log.debug("來(lái)自客戶端的消息:" + message); // TODO 收到客戶端消息后的操作 } /** * 發(fā)生錯(cuò)誤時(shí)調(diào)用 */ @OnError public void onError(Session session, Throwable error) { log.debug("發(fā)生錯(cuò)誤"); error.printStackTrace(); } public void sendMessage(String message) throws IOException { this.session.getAsyncRemote().sendText(message); } /** * 發(fā)送信息給指定ID用戶,如果用戶不在線則返回不在線信息給自己 * @param message * @param sendUserId * @throws IOException */ public Boolean sendToUser(String message, String sendUserId) throws IOException { Boolean flag = true; WebSocket socket = webSocketSet.get(sendUserId); if (socket != null) { try { if (socket.session.isOpen()) { socket.sendMessage(message); } else { flag = false; } } catch (Exception e) { flag = false; e.printStackTrace(); } } else { flag = false; log.warn("【" + sendUserId + "】 該用戶不在線"); } return flag; } /** * 群發(fā)自定義消息 */ public void sendToAll(String message) throws IOException { for (String key : webSocketSet.keySet()) { try { WebSocket socket = webSocketSet.get(key); if (socket.session.isOpen()) { socket.sendMessage(message); } } catch (IOException e) { continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocket.onlineCount++; } public static synchronized void subOnlineCount() { if (WebSocket.onlineCount > 0) WebSocket.onlineCount--; } }
這里使用的是較為原始的websocket連接方式,事實(shí)上springboot已經(jīng)融合了websocket,工作關(guān)系沒有空暫未研究。記錄一下有空了再去寫寫demo。這個(gè)socket服務(wù)端主要實(shí)現(xiàn)了:1. 連接控制,建立連接時(shí)驗(yàn)證id的合法性。無(wú)證連接進(jìn)行異常記錄并關(guān)閉連接。2. 離線消息檢測(cè)到上線立即推送 這是消息推送需要實(shí)現(xiàn)的基本功能之一了,詳見代碼。3. 統(tǒng)計(jì)在線人數(shù) 依舊是基本功能
下面是websocket服務(wù)端配置類WebSocketServerConfig
import lombok.extern.slf4j.Slf4j; import org.apache.catalina.session.StandardSessionFacade; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; import javax.servlet.http.HttpSession; import javax.websocket.HandshakeResponse; import javax.websocket.server.HandshakeRequest; import javax.websocket.server.ServerEndpointConfig; import javax.websocket.server.ServerEndpointConfig.Configurator; @Configuration @Slf4j public class WebSocketServerConfig extends Configurator { @Override public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { /* 如果沒有監(jiān)聽器,那么這里獲取到的HttpSession是null */ StandardSessionFacade ssf = (StandardSessionFacade) request.getHttpSession(); if (ssf != null) { HttpSession httpSession = (HttpSession) request.getHttpSession(); // 關(guān)鍵操作 sec.getUserProperties().put("sessionId", httpSession.getId()); log.debug("獲取到的SessionID:" + httpSession.getId()); } } /** * 如果使用獨(dú)立的servlet容器,而不是直接使用springboot的內(nèi)置容器 * 就不要注入ServerEndpointExporter,因?yàn)樗鼘⒂扇萜髯约禾峁┖凸芾怼? * 即:生產(chǎn)環(huán)境中在獨(dú)立的tomcat運(yùn)行時(shí)請(qǐng)注釋掉這個(gè)bean * * @return * * @author caoting * @date 2019年2月20日 */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
這里其實(shí)有個(gè)坑,就是上述代碼中的bean類 serverEndpointExporter,開發(fā)環(huán)境如果不是配置獨(dú)立的tomcat運(yùn)行的話是需要注入的,但是生產(chǎn)環(huán)境下在獨(dú)立的tomcat容器運(yùn)行時(shí)是需要注釋掉的,否則會(huì)報(bào)錯(cuò)。
很重要的session監(jiān)聽器 RequestListener
import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.servlet.ServletRequestEvent; import javax.servlet.ServletRequestListener; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpSession; /** * 監(jiān)聽器類:主要任務(wù)是用ServletRequest將我們的HttpSession攜帶過去 * 此注解千萬(wàn)千萬(wàn)不要忘記,它的主要作用就是將這個(gè)監(jiān)聽器納入到Spring容器中進(jìn)行管理,相當(dāng)于注冊(cè)監(jiān)聽 */ @Component @Slf4j public class RequestListener implements ServletRequestListener { @Override public void requestInitialized(ServletRequestEvent sre) { // 將所有request請(qǐng)求都攜帶上httpSession HttpSession httpSession = ((HttpServletRequest) sre.getServletRequest()).getSession(); log.debug("將所有request請(qǐng)求都攜帶上httpSession " + httpSession.getId()); } public RequestListener() { } @Override public void requestDestroyed(ServletRequestEvent arg0) { } }
以上就是一個(gè)websocket服務(wù)端需要的所有配置和類
websocket客戶端代碼
import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.curator.shaded.com.google.common.collect.Maps; import redis.clients.jedis.Jedis; import javax.websocket.*; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; /** * @author caoting * @date 2018年9月27日 */ @Slf4j @ClientEndpoint public class MailWebSocketClient { private static RedisCacheUtil redis; protected void setRedis(RedisCacheUtil redisTmp) { redis = redisTmp; } /** * @author caoting * @date 2019年3月11日 */ public static void doSomething() { // TODO 由于這個(gè)類沒有寫初始化方法,但是有些初始化操作必須完成, // 因此在socket配置類中調(diào)用此方法可以完成一些需要初始化注入的操作 } private Session session; @OnOpen public void open(Session session) { log.info("連接開啟..."); this.session = session; } @OnMessage public void onMessage(String message) { log.info("來(lái)自服務(wù)端的消息: " + message); // TODO 對(duì)消息進(jìn)行過濾判斷處理 // 不做過多操作影響性能 直接交給異步任務(wù)處理--這個(gè)辦法還是比較low的現(xiàn)在springboot有更好的解決辦法@Async 有空再記錄下多線程異步處理任務(wù)調(diào)度的相關(guān)代碼。 ExecutorService executor = Executors.newSingleThreadExecutor(); FutureTaskfuture = new FutureTask (new Callable () {// 使用Callable接口作為構(gòu)造參數(shù) public Boolean call() { return pushMsg(message); } }); executor.execute(future); Boolean res = CommomUtil.timeOutTask(future, executor, 600); if (res != null && res) log.info("操作成功"); else log.info("操作失敗"); } /** * @author caoting * @date 2019年3月11日 */ private Boolean pushMailMsg(String message) { Boolean flag = true; // 推送消息 ReceiverRes resObj = new ReceiverRes(); try { resObj = restTemplate.httpPostMediaTypeJson(url, ReceiverRes.class, message); } catch (Exception e) { // 這里異常一般是http接口服務(wù)宕機(jī)了,所以放進(jìn)緩存在對(duì)方上線時(shí)進(jìn)行重新推送 resObj.setCode(500); log.error(e.getMessage(), e); } // ====推送完成后的后續(xù)異常檢查與數(shù)據(jù)重發(fā)工作 這里是一個(gè)redis任務(wù)調(diào)度 處理失敗任務(wù)的典型案例 看不懂就刪掉 Integer code = resObj.getCode(); if (code == 500) { // 發(fā)送失敗存進(jìn)redis緩存 按照約定好的狀態(tài)碼進(jìn)行判斷 jedis.lpush(Constant.PUSH_ERROR, mailMapJson); } else { // 發(fā)送成功以后查詢以前出錯(cuò)的數(shù)據(jù)進(jìn)行重新推送。--這種辦法只適合消息很頻繁的,畢竟不頻繁的等下次發(fā)消息又不知道是何時(shí)了,因此需要采用別的方法 while (true) { // 查詢以往的異常發(fā)送數(shù)據(jù) 重新發(fā)送 String jsonMap = jedis.rpoplpush(Constant.PUSH_ERROR, Constant.PUSH_ERROR_TMP); if (StringUtils.isEmpty(jsonMap)) { break; } try { errObj = restTemplate.httpPostMediaTypeJson(receiverUrl, ReceiverRes.class, message); } catch (Exception e) { errObj.setCode(500); log.error(e.getMessage(), e); } if (errObj.getCode() == 500) { // 再次失敗 彈回原隊(duì)列 jedis.rpoplpush(Constant.PUSH_ERROR_TMP, Constant.PUSH_ERROR); } else { jedis.rpop(Constant.PUSH_ERROR_TMP); } } } return flag; } @OnClose public void onClose() { log.info("長(zhǎng)連接關(guān)閉..."); } @OnError public void onError(Session session, Throwable t) { t.printStackTrace(); } public void send(String message) { this.session.getAsyncRemote().sendText(message); } public void close() throws IOException { if (this.session.isOpen()) { this.session.close(); } } }
上面是websocket客戶端的代碼。其中主要有:1、http推送失敗重發(fā)機(jī)制 2、redis任務(wù)調(diào)度經(jīng)典案例
websocket客戶端配置類WebSocketConfig
import com.hnpolice.business.service.ApplicationService; import com.hnpolice.sso.common.ex.BaseException; import com.hnpolice.sso.common.utils.RedisCacheUtil; import com.hnpolice.sync.RestTemplateFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import javax.websocket.ContainerProvider; import javax.websocket.WebSocketContainer; import java.net.URI; @Slf4j @Component public class WebSocketConfig implements ApplicationRunner { @Autowired private RedisCacheUtil redisTmp; private static Boolean isOk; private MailWebSocketClient client; private static WebSocketContainer conmtainer = ContainerProvider.getWebSocketContainer(); @Override public void run(ApplicationArguments args) throws Exception { // 跟隨項(xiàng)目啟動(dòng)的方法可以在這里做一些初始化工作 // websocket客戶端初始化 wsClientInit(); } public void wsClientInit() { try { client = new MailWebSocketClient(); client.setRedis(redisTmp); MailWebSocketClient.dosomething(); conmtainer.connectToServer(client, new URI(##socket服務(wù)連接地址##)); isOk = true; } catch (Exception e) { isOk = false; log.error(e); } // 斷線重連 while (true) { if (isOk != null && isOk) { try { client.send("ping:"+appId); } catch (Exception e) { isOk = false; } } else { // 系統(tǒng)連接失敗進(jìn)行重試 log.warn("系統(tǒng)連接失敗,正在重連..."); try { client.send("ping:"+appId); log.warn("系統(tǒng)重連成功!"); isOk = true; } catch (Exception e) { try { client = new MailWebSocketClient(); conmtainer.connectToServer(client, new URI(mailUrl)); isOk = true; } catch (Exception e1) { isOk = false; } if (isOk != null && isOk) { log.warn("系統(tǒng)重連成功!"); } } } try { Thread.sleep(30000); } catch (InterruptedException e) { log.error(BaseException.collectExceptionStackMsg(e)); e.printStackTrace(); } } } }
這是websocket客戶端的配置類,實(shí)現(xiàn)ApplicationRunner 接口是為了在項(xiàng)目啟動(dòng)時(shí)完成一些初始化工作,并非必須。主要功能:1、協(xié)助websocketCient進(jìn)行初始化,2、心跳包檢測(cè),斷線自動(dòng)重連
消息推送的第二種方式在下篇中再編寫
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/74103.html
摘要:修改記錄版本的通知欄消息功能上并未發(fā)生變化,右上角的縮減為了。增加了,允許可穿戴設(shè)備遠(yuǎn)程控制通知欄消息。鎖屏狀態(tài)下,可以控制通知欄消息的隱私程度。但是谷歌規(guī)定,自定義布局展示的通知欄消息最大高度是。具體適配不正常的機(jī)型有。 此文已由作者黎星授權(quán)網(wǎng)易云社區(qū)發(fā)布。 歡迎訪問網(wǎng)易云社區(qū),了解更多網(wǎng)易技術(shù)產(chǎn)品運(yùn)營(yíng)經(jīng)驗(yàn)。 由于歷史原因,Android在發(fā)布之初對(duì)通知欄Notification的設(shè)...
摘要:在對(duì)事實(shí)性要求沒有那么高的情況下,可以用基于最大努力交付消息隊(duì)列以及消息存儲(chǔ)來(lái)解決最終一致性。可靠消息服務(wù)和消息組件,協(xié)調(diào)上下游消息的傳遞,并確保上下游數(shù)據(jù)的一致性。下游應(yīng)用通知可靠消息服務(wù)該消息已經(jīng)成功消費(fèi)。 本文對(duì)比 二階段事務(wù)、最大努力交付以及消息最終一致性,并給出部分解決方案,最終一致性方案參考阿里RockMQ事務(wù)消息:http://blog.csdn.net/chunlong...
摘要:一小小推廣講座本話題已收入視頻講座分布式事務(wù)解決方案大家不妨圍觀下開源項(xiàng)目我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請(qǐng)大家圍觀。 一 小小推廣 講座 本話題已收入視頻講座《Spring Cloud分布式事務(wù)解決方案》大家不妨圍觀下 開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請(qǐng)大家圍觀??梢詤⒖糋ithub CoolMQ源碼,項(xiàng)目支持網(wǎng)站: http:/...
摘要:在端,盡管開發(fā)人員對(duì)其功能的需求很高,但出于某些原因,推送通知被引入的時(shí)間比較晚。發(fā)送推送通知在服務(wù)器上實(shí)現(xiàn)調(diào)用,該調(diào)用觸發(fā)到用戶設(shè)備的推送消息。推送服務(wù)推送服務(wù)是接收請(qǐng)求驗(yàn)證請(qǐng)求并將推送消息發(fā)送到對(duì)應(yīng)的瀏覽器。 這是專門探索 JavaScript 及其所構(gòu)建的組件的系列文章的第9篇。 想閱讀更多優(yōu)質(zhì)文章請(qǐng)猛戳GitHub博客,一年百來(lái)篇優(yōu)質(zhì)文章等著你! 如果你錯(cuò)過了前面的章節(jié),可以在...
閱讀 3010·2023-04-26 02:49
閱讀 3529·2021-11-25 09:43
閱讀 3724·2021-10-09 09:43
閱讀 3175·2021-09-28 09:44
閱讀 2541·2021-09-22 15:29
閱讀 4741·2021-09-14 18:02
閱讀 2852·2021-09-03 10:48
閱讀 3486·2019-08-30 12:47