当前位置:网站首页>Microservice system design -- message caching service design

Microservice system design -- message caching service design

2022-06-27 04:21:00 Zhuangxiaoyan

Abstract

Cache and queue , It is a common strategy to deal with the high concurrency and high load environment of the Internet , The cache greatly reads and writes data , The queue effectively cuts the peak and flattens the valley of the pressure , Reduce the load on the system . The better solution to achieve queue is to use message oriented middleware , But message oriented middleware is not only a feature of queue , It can also be applied to asynchronous decoupling 、 Message driven development and other functions , This blog introduces message driven development under microservices .

One 、 Message middleware products

There are many message oriented middleware products , Common are Apache ActiveMQ、RabbitMQ、ZeroMQ、Kafka、Apache RocketMQ wait , There are still a lot of it , How to select the model , There are a lot of articles on the Internet ( Here is an official document , And ActiveMQ、Kafka Comparison ,http://rocketmq.apache.org/docs/motivation/), There is no discussion here .

The above is from Wikipedia Definition of message oriented middleware , The scene is very clear —— Distributed systems , It could be software or hardware , By sending 、 Accept messages for asynchronous decoupling , Usually there are three parts : Message producer 、 Consumers of intermediate services and messages .

This case is mainly based on Spring Cloud Alibaba Launch project ,RocketMQ As part of the project set , Superior performance on Alibaba's product line , So that more and more projects choose it for technical selection , This message also uses RocketMQ, Let's find out RocketMQ Let's get started .

Two 、RocketMQ The basic principle

RocketMQ Alibaba's open source distributed message middleware , pure Java Realization ; Clusters and HA Implementation is relatively simple ; Lower message loss rates in the event of downtime and other failures . Many product lines of Alibaba are using , It has withstood a lot of stable operation under great pressure . It's up to Apache The open source community , The community is more active . Official website address :http://rocketmq.apache.org/.

There are several core modules :

  • Broker yes RocketMQ Core module , Responsible for receiving and storing messages
  • NameServer It can be seen as RocketMQ Registration Center for , It manages two parts of the data : Clustered Topic-Queue Routing configuration for ;Broker Real time configuration information . therefore , Must ensure broker/nameServer You can use , Then we can produce messages 、 Consumption and transmission .
  • Producer And product group Belonging to the producer , Is the end of the message .
  • Consumer And consumer group Belonging to the consumer , Responsible for one end of the consumption message .
  • Topic/message/queue, It is mainly used to carry message content .

1.1 RocketMQ Installation

 Prepare the compiled binary installation package , That is, the common green decompression version .

    appledeMacBook-Air:bin apple$ wget http://mirror.bit.edu.cn/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-bin-release.zip

    appledeMacBook-Air:software apple$unzip rocketmq-all-4.6.0-bin-release.zip

    appledeMacBook-Air:software apple$cd rocketmq-all-4.6.0-bin-release/bin

    appledeMacBook-Air:bin apple$ nohup ./mqnamesrv &

    appledeMacBook-Air:bin apple$ nohup ./mqbroker -n localhost:9876 &

in addition , It has to be set up NAMESRV_ADDR Address , Otherwise, it cannot be used normally , You can also write profile In file , You can also use the command line directly :

export NAMESRV_ADDR=localhost:9876
 If it's closed , Shut down first  broker server, To shut down  namesrv.

sh bin/mqshutdown broker
The mqbroker(12465) is running...
Send shutdown request to mqbroker(12465) OK
sh bin/mqshutdown namesrv
The mqnamesrv(12456) is running...
Send shutdown request to mqnamesrv(12456) OK

Test for successful installation

Start two terminals , Input the command line at the message production end :

#sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...
# A circular write message... Is displayed below , To be consumed by consumers 
 At another terminal , Enter the consumer command line :

#sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...
# The following directly prints the message written by the birth and delivery end 

1.2 Service integration RocketMQ

be based on Spring Cloud Project integration RocketMQ when , Need to use Spring Cloud Stream subprojects , You should also pay attention to the version correspondence between the sub project and the main project . Three key concepts in the project :

  • Destination Binders: Components integrated with external components , Components here refer to Kafka or RabbitMQ etc.
  • Destination Bindings: The bridge between the external messaging system and the application , The position of the gray column in the following figure
  • Message: Message entity , The producer or consumer interacts with the message middleware based on this data entity

Consumer integration

parking-message The module acts as a message consumer , stay pom.xml Introduction in jar( Not configured here version I believe you already know the reason ):

<dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

The corresponding configuration file application.properties Add configuration item in :

#rocketmq config
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
# The name in the following configuration is  input  Of  binding  To be associated with  Sink  The names in the are consistent 
spring.cloud.stream.bindings.input.destination=park-pay-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=park-pay-group
# Whether to synchronize the consumption message mode , The default is  false
spring.cloud.stream.rocketmq.bindings.input.consumer.orderly=true

The default message consumption channel is used here input. Add annotations to the startup class @EnableBinding({Sink.class}), Connect to the message broker component at startup . What is? Sink? Project built-in simple message channel definitions ,Sink Represents the destination of the message . The producer side will use Source, Represents the source of the information .

Write consumer , increase @StreamListener annotation , To make it receive stream processing events , Continuously process the received messages :

@Service
@Slf4j
public class ConsumerReceive {

    @StreamListener(Sink.INPUT)
    public void receiveInput(String json) throws BusinessException{
    // For testing purposes only , Formal applications can integrate corresponding message push interfaces , For example, Aurora 、 WeChat 、 SMS, etc. 
        log.info("Receive input msg = " +json +" by RocketMQ...");

    }
}

  Producer side integration

parking-charging Module , When the customer's vehicle leaves the site , Whether it is paid by monthly card users or supported by non monthly card users , After payment, you need to send a message to the customer , Prompt fee deduction information . In the module pom.xml In file starter Mode introduction jar:

<!-- rocketmq -->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

application.properties:

#rocketmq config
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
# The name in the following configuration is output Of binding To be associated with Source The names in the are consistent 
spring.cloud.stream.rocketmq.bindings.output.producer.group=park-pay-group-user-ouput
spring.cloud.stream.rocketmq.bindings.output.producer.sync=true

spring.cloud.stream.bindings.output.destination=park-pay-topic
spring.cloud.stream.bindings.output.content-type=application/json

Add startup class @EnableBinding({Source.class}) annotation , Be careful , The key identifier bound here is Source, And Consumer side Sink The formation of echo .

Why consumers are Sink/input, And the producer is Source/output, It seems a little contradictory ? Let's understand : The producer is the source , Is the message output ; Consumers accept external input , yes input.

Write a message sending method :

    @Autowired
    Source source;

      @PostMapping("/sendTestMsg")
    public void sendTestMsg() {
        Message message = new Message();
        message.setMcontent(" This is the first message test .");
        message.setMtype(" Payment message ");
        source.output().send(MessageBuilder.withPayload(JSONObject.toJSONString(message)).build());
    }

To start, respectively, parking-charging、parking-message Two modules , Call the send message test method , Under normal circumstances, the log is output :

2020-01-07 20:37:42.311  INFO 93602 --- [MessageThread_1] c.m.parking.message.mq.ConsumerReceive   : Receive input msg = {"mcontent":" This is the first message test .","mtype":" Payment message "} by RocketMQ...
2020-01-07 20:37:42.315  INFO 93602 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A800696DA018B4AAC223534ED40000 cost: 35 ms

Only the default is used here Sink and Source Interface , When more channels are used in the project , You can customize your own Sink and Source Interface , Just keep Sink and Source Rules for writing , Replace the default load class in the project and you can use it normally .

// Customize  Sink  passageway 
public interface MsgSink {

    /**
     * Input channel name.
     */
    String INPUT1 = "input1";

    /**
     * @return input channel. Subscribe to a message 
     */
    @Input(MsgSink.INPUT1)
    SubscribableChannel myInput();
}
// Customize  Source  passageway 
public interface MsgSource {

    /**
     * Name of the output channel.
     */
    String OUTPUT = "output1";

    /**
     * @return output channel
     */
    @Output(MsgSource.OUTPUT)
    MessageChannel output1();
}

Spring Cloud Stream The project integrates many messaging system components , Interested friends can try other The messaging system , Look and see RocketMQ How much difference . Above, we have completed an example of message driven development through intermediate repair , Decouple the system asynchronously , Make the system pay more attention to its own business logic . such as parking-message The project focuses on the push of external information , For example, push wechat messages to different terminals 、 SMS 、 mail 、App Push, etc .

Blog reference

原网站

版权声明
本文为[Zhuangxiaoyan]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/178/202206270411017797.html