当前位置:网站首页>I have seven schemes to realize web real-time message push, seven!
I have seven schemes to realize web real-time message push, seven!
2022-07-25 05:10:00 【InfoQ】


What is message push (push)
pushweb End message push Mobile message push 

+1
pushpullShort polling
polling Short polling Long polling HTTPsetInterval(() => {
// Method request
messageCount().then((res) => {
if (res.code === 200) {
this.messageCount = res.data
}
})
}, 1000);

Long polling
NacosapollokafkaRocketMQNacosapolloDeferredResultservelet3.0
DeferredResultDeferredResult.setResult(200)guavaMultimap@Controller
@RequestMapping("/polling")
public class PollingController {
// Store and monitor a Id Long polling set for
// Thread synchronization structure
public static Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedMultimap(HashMultimap.create());
/**
* official account : Programmer Xiaofu
* Set listening
*/
@GetMapping(path = "watch/{id}")
@ResponseBody
public DeferredResult<String> watch(@PathVariable String id) {
// Delay object setting timeout
DeferredResult<String> deferredResult = new DeferredResult<>(TIME_OUT);
// Remove when the asynchronous request completes key, Prevent memory overflow
deferredResult.onCompletion(() -> {
watchRequests.remove(id, deferredResult);
});
// Registrar polling request
watchRequests.put(id, deferredResult);
return deferredResult;
}
/**
* official account : Programmer Xiaofu
* Change data
*/
@GetMapping(path = "publish/{id}")
@ResponseBody
public String publish(@PathVariable String id) {
// Data changes Take out the monitor ID All long polling requests , And respond one by one
if (watchRequests.containsKey(id)) {
Collection<DeferredResult<String>> deferredResults = watchRequests.get(id);
for (DeferredResult<String> deferredResult : deferredResults) {
deferredResult.setResult(" I updated " + new Date());
}
}
return "success";
}
AsyncRequestTimeoutException@ControllerAdvice@ControllerAdvice
public class AsyncRequestTimeoutHandler {
@ResponseStatus(HttpStatus.NOT_MODIFIED)
@ResponseBody
@ExceptionHandler(AsyncRequestTimeoutException.class)
public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) {
System.out.println(" Asynchronous request timed out ");
return "304";
}
}
/polling/watch/10086/polling/publish/10086
iframe flow
<iframe>srciframeHTMLjavascript
<iframe><iframe src="/iframe/message" style="display:none"></iframe>
response@Controller
@RequestMapping("/iframe")
public class IframeController {
@GetMapping(path = "message")
public void message(HttpServletResponse response) throws IOException, InterruptedException {
while (true) {
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().print(" <script type=\"text/javascript\">\n" +
"parent.document.getElementById('clock').innerHTML = \"" + count.get() + "\";" +
"parent.document.getElementById('count').innerHTML = \"" + count.get() + "\";" +
"</script>");
}
}
}

SSE ( My way )
WebSocketServer-sent eventsSSESSEHTTP
text/event-stream
SSEWebSocket- SSE Is based on HTTP Agreed , They do not require special protocols or server implementations to work ;
WebSocketA separate server is required to handle the Protocol .
- SSE One-way communication , Only one-way communication from the server to the client ;webSocket Full duplex communication , That is, both sides of the communication can send and receive information at the same time .
- SSE Simple implementation and low development cost , There is no need to introduce other components ;WebSocket Data transmission requires secondary analysis , The development threshold is higher .
- SSE Disconnection and reconnection are supported by default ;WebSocket You need to do it yourself .
- SSE Only text messages can be sent , Binary data needs to be encoded before transmission ;WebSocket Binary data transfer is supported by default .
SEEWebSockets Automatically reconnect event ID Send any event <script>
let source = null;
let userId = 7777
if (window.EventSource) {
// Establishing a connection
source = new EventSource('http://localhost:7777/sse/sub/'+userId);
setMessageInnerHTML(" Connect the user =" + userId);
/**
* Once the connection is established , It will trigger open event
* Another way of writing :source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML(" Establishing a connection ...");
}, false);
/**
* The client receives the data from the server
* Another way of writing :source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
} else {
setMessageInnerHTML(" Your browser does not support it SSE");
}
</script>
SseEmittersseEmitterMapprivate static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* Create connection
*
* @date: 2022/7/12 14:51
* @auther: official account : Programmer Xiaofu
*/
public static SseEmitter connect(String userId) {
try {
// Set timeout ,0 No expiration date . Default 30 second
SseEmitter sseEmitter = new SseEmitter(0L);
// Register callback
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId, sseEmitter);
count.getAndIncrement();
return sseEmitter;
} catch (Exception e) {
log.info(" Create a new sse Abnormal connection , The current user :{}", userId);
}
return null;
}
/**
* Send a message to the specified user
*
* @date: 2022/7/12 14:51
* @auther: official account : Programmer Xiaofu
*/
public static void sendMessage(String userId, String message) {
if (sseEmitterMap.containsKey(userId)) {
try {
sseEmitterMap.get(userId).send(message);
} catch (IOException e) {
log.error(" user [{}] Push exception :{}", userId, e.getMessage());
removeUser(userId);
}
}
}

IE
MQTT
MQTTpublishsubscribe Lightweight Internet of Thingpublishersubscriber
TCPMQTTMQTTTCP/IPTCP/IPMQTTMQTTHTTP- First
HTTPProtocol, which is a synchronization protocol , After the client requests, it needs to wait for the response of the server . And in the Internet of things (IOT) Environment , The equipment will be subject to the influence of the environment , Such as low bandwidth 、 High network latency 、 Unstable network communication, etc , Obviously, asynchronous messaging protocol is more suitable forIOTApplications .
HTTPIs one-way , If you want to get a message, the client must initiate a connection , And in the Internet of things (IOT) In the application , Devices or sensors are often clients , This means that they cannot passively receive commands from the network .
- Usually you need to send a command or message , Send to all devices on the network .
HTTPIt is not only difficult to realize such a function , And the cost is very high .
Websocket
websocketTCP
websocket<!-- introduce websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
@ServerEndpointws://localhost:7777/webSocket/10086@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServer {
// A connection session with a client , It is needed to send data to the client
private Session session;
private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
// Used to store the number of online connections
private static final Map<String, Session> sessionPool = new HashMap<String, Session>();
/**
* official account : Programmer Xiaofu
* Link the method successfully called
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "userId") String userId) {
try {
this.session = session;
webSockets.add(this);
sessionPool.put(userId, session);
log.info("websocket news : There are new connections , The total number is :" + webSockets.size());
} catch (Exception e) {
}
}
/**
* official account : Programmer Xiaofu
* Method called upon receipt of a client message
*/
@OnMessage
public void onMessage(String message) {
log.info("websocket news : Received client message :" + message);
}
/**
* official account : Programmer Xiaofu
* This is a single message
*/
public void sendOneMessage(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("websocket eliminate : Single point message :" + message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
<script>
var ws = new WebSocket('ws://localhost:7777/webSocket/10086');
// Get connection status
console.log('ws Connection status :' + ws.readyState);
// Monitor whether the connection is successful
ws.onopen = function () {
console.log('ws Connection status :' + ws.readyState);
// If the connection is successful, send a data
ws.send('test1');
}
// Answer the information sent back by the server and process the display
ws.onmessage = function (data) {
console.log(' Received a message from the server :');
console.log(data);
// Close after communication WebSocket Connect
ws.close();
}
// Listen for connection close events
ws.onclose = function () {
// Monitor the whole process websocket The state of
console.log('ws Connection status :' + ws.readyState);
}
// Monitor and process error event
ws.onerror = function (error) {
console.log(error);
}
function sendMessage() {
var content = $("#message").val();
$.ajax({
url: '/socket/publish?userId=10086&message=' + content,
type: 'GET',
data: { "id": "7777", "content": content },
success: function (data) {
console.log(data)
}
})
}
</script>


Custom push

Github Address
Github
边栏推荐
- How to ensure data consistency between MySQL and redis?
- China trifluoroethanol industry research and investment forecast report (2022 Edition)
- How can test / development programmers with 5 years of experience break through the technical bottleneck? Common problems in big factories
- deep报错
- STL notes (VI): container vector
- Implementation principle of epoll
- Openworm project compilation
- Dragon Dragon community released the first Anolis OS Security Guide to escort users' business systems
- Redis的三个必知必会的问题
- Blog Description & message board
猜你喜欢

The third day of rhcsa summer vacation

2022-7-15 summary

初步了解Panda3d粒子系统

STL notes (I): knowledge system

Gradle test and idea test

Go language function

Implementation of recommendation system collaborative filtering in spark

Implementation principle of epoll

Summary and Prospect of aut, the transport layer protocol of sound network -- dev for dev column

Document collaboration tool recommendation
随机推荐
Etcd learning
[analysis of GPIO register (crl/crh) configuration of STM32]
Logu p3398 hamsters find sugar solution
Interviewer: explain the core principle of ThreadLocal
The market is right
Event cycle mechanism browser nodejs async await execution sequence promise execution sequence interview questions
OA and fansoft Bi cross system users, departments and posts synchronous summary
[live review] AI customer service "changes according to the situation", and man-machine dialogue can be easier
DOM processing in ahooks
Why does the official not recommend using UUID as MySQL primary key
STM32 Development Notes 119: what macros are required to enable FPU?
STM32 Development Notes 118: using CMSIS DSP Library in stm32cube IDE
[small program practice] first day
STL notes (VI): container vector
300. Longest increasing subsequence
哪种网站适合物理服务器
Which side of Nacos has the SQL script of this column?
This low code reporting tool is needed for data analysis
[no title] 1
Sword finger offer II 012. the sum of the left and right subarrays is equal