最近收到一个需求,需要做一套消息中心,需求倒是很简单,再用户有新消息时推送给web页面,提示用户有新消息未读,最初版本用短轮询方案实现,若是局部组件,可关闭页面后结束轮询,但是此消息中心是一个全局组件,只要开启页面就开始轮询,体验不佳,看着network密密麻麻的请求头都大了,随使用SSE方案。
对于这种消息推送目前几个成熟方案:
- 客户端轮询 (短轮询)
- 服务端轮询 (长轮询)
- WebSocket
- SSE (Serve-Send Events)
各方案间区别:
* |
客户端轮询 |
服务端轮询 |
WebSocket |
SSE |
协议 |
http |
http |
tcp |
http |
优点 |
实现方便,兼容性好 |
同短轮询,但比短轮询节约资源,相对短轮询请求次数少 |
双全工通信协议,性能开销相对较小,可双向通信 |
H5规范的一部分,无需安装直接使用;资源占用小;前端部分实现极其简单 |
缺点 |
占用较多内存和请求数;污染network列表 |
同短轮询 |
开发成本高;相对sse资源开销大 |
单向推送;兼容性问题;只能get请求,且请求头无法加内容(或者使用第三方封装sse插件) |
SSE兼容问题:
SSE实现
1. 前端实现 + visibilitychange性能优化
经过测试环境几天的运行发现了新的问题,当用户挂机时,js代码也在正常的跑,如果用户忘记关闭页面,且电脑从不关机,就会导致页面请求每隔一小时发送一次,实际绝大部分挂机时间不需要维持此链接,那么我们可以通过visibilitychange
事件优化我们的消息通知
通过mdn文档可知,我们可以通过visible
和hidden
去优化我们的代码
以下代码为最新代码,带浏览器降级处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| const RETRY_TIME = 15 * 1000
mounted() { this.initNotify() this.initListener() },
methods: { initNotify() { if ('EventSource' in window) { this.getUnReadNoticeCount() this.registerEvent() } else { clearTimeout(this.timer) this.loopFetch() } }, initListener() { document.addEventListener('visibilitychange', this.listeningChange) }, listeningChange() { if (document.visibilityState === 'hidden') { this.clearAllTimer() } else if (this.eventSource) { if (this.eventSource.readyState === 2) { this.registerEvent() } } else { this.loopFetch() } }, registerEvent() { if (document.visibilityState === 'visible') { const now = new Date().getTime() if (!this.sourceTime || (now - this.sourceTime) >= RETRY_TIME) { this.sourceTime = new Date().getTime() const user = getUserInfo() const userId = user ? JSON.parse(user).userId : '' this.eventSource = new EventSource(notifyNumUrl(userId)) console.log('eventSource: ', this.eventSource); this.eventSource.onopen = () => { clearTimeout(this.errorTimer) } this.eventSource.onmessage = ({ data }) => { if (data) { this.noticeNum = data } } this.eventSource.onerror = (error) => { if (error.currentTarget.readyState === 2) { this.eventSource.close && this.eventSource.close() this.registerEvent() } } } else { this.errorTimer = setTimeout(() => { this.registerEvent() }, RETRY_TIME) } } }, loopFetch() { this.getUnReadNoticeCount() this.timer = setTimeout(() => { this.loopFetch() }, 1000 * 30) }, getUnReadNoticeCount() { }, clearAllTimer() { if (this.eventSource) { clearTimeout(this.errorTimer) } else { clearTimeout(this.timer) } }, },
beforeDestroy() { document.removeEventListener('visibilitychange', this.listeningChange) this.clearAllTimer() this.eventSource && this.eventSource.close && this.eventSource.close() },
|
2. 后端实现 JAVA Spring Web MVC
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| @RestController @RequestMapping("/sse") @Api(tags = "Sse消息通知") @Slf4j public class SseController extends BaseController { @Resource private SseService sseService;
@GetMapping("/create") public Object createSession(String userId) throws IOException { return sseService.createSession(userId); }
@GetMapping("/close") public void closeSession(String userId) throws IOException { sseService.closeSession(userId); } }
@DubboService @Service @Slf4j public class SseServiceImpl implements SseService {
public final static Map<String, SseEmitter> SSE_CACHE= new ConcurrentHashMap<>();
public final static Map<String, Integer> SEND_RECORD= new ConcurrentHashMap<>();
public synchronized SseEmitter createSession(String clientId) throws IOException{ SEND_RECORD.remove(clientId); SseEmitter sseEmitter = new SseEmitter(0L); SSE_CACHE.put(clientId,sseEmitter); log.info("客户端:{} 新建连接成功,当前客户端总数为【{}】",clientId,SSE_CACHE.size() ); return sseEmitter; }
@Override public void closeSession(String clientId) { if (SSE_CACHE.containsKey(clientId)){ SSE_CACHE.get(clientId).complete(); SSE_CACHE.remove(clientId); SEND_RECORD.remove(clientId); log.info("客户端:【{}】 断开成功,当前剩余客户端总数为【{}】",clientId,SSE_CACHE.size()); } } }
@XxlJob("sendUnRead") public void sendUnRead(){ if (SseServiceImpl.SSE_CACHE.size()>0){ List<UnReadNoticePO> unReadNoticePOS = noticeService.countAllUnReadNotice(new ArrayList<>(SseServiceImpl.SSE_CACHE.keySet())); log.info("stringIntegerMap",unReadNoticePOS); if(CollectionUtil.isNotEmpty(unReadNoticePOS)){
for (Map.Entry<String, SseEmitter> entry : SseServiceImpl.SSE_CACHE.entrySet()) { String key = entry.getKey(); Optional<UnReadNoticePO> first = unReadNoticePOS.stream().filter(u -> u.getUserId().equals(key)).findFirst();
if(first.isPresent()){ log.info("first",first.get()); SseEmitter sseEmitter = SseServiceImpl.SSE_CACHE.get(key); try { Integer lastNum = SseServiceImpl.SEND_RECORD.get(first.get().getUserId()); if(ObjectUtil.isEmpty(lastNum) || lastNum!=first.get().getNum()){ sseEmitter.send(SseEmitter.event().reconnectTime(1000).id(entry.getKey()).data(first.get().getNum())); log.info(entry.getKey()+"发送消息成功,内容:{}",first.get().getNum()); } SseServiceImpl.SEND_RECORD.put(first.get().getUserId(),first.get().getNum()); }catch (IOException e){ SseServiceImpl.SSE_CACHE.remove(entry.getKey()); log.error(entry.getKey()+"消息发送失败,通道已关闭!",e);
} } } } } }
|
3. 运维
运维需要配合修改nginx
配置的proxy-read-timeout
超时时间,目前我们方案超时时间一小时。
成品展示
此时只保留一条http请求,后端轮询到新消息就推送到前端,若前端页面不显示或被隐藏(锁屏切标签等)则不去请求,若页面显示则自动恢复请求。方便快捷,体验更好