当前位置:网站首页>C#&. Net to implement a distributed event bus from 0 (1)
C#&. Net to implement a distributed event bus from 0 (1)
2022-06-21 13:42:00 【Guo Mahua】
The event bus can be viewed from the global perspective of the system , Provide a set of publish and subscribe mechanism . Have used ABP Framework development , Or studied DDD, Or understand Rebus Classmate , You should be aware of the many benefits of applying an event bus , And it can bring us the improvement of program design thought .
Source code &Nuget
I learned some open source component libraries , And from their own experience , from 0 A design based on RabbitMq Distributed event bus , You can come to my Azure Devops Warehouse View source code .
And I published it as Nuget package , It can be installed and used by everyone
Install-Package MaH.EventBus -Version 0.7.0Let's start with a simple application :
// Define an order update event
namespace MaH.SimpleLocalEventBus.Console
{
public class UpdateOrderEvent
{
public DateTime Timestamp { get; set; }
public UpdateOrderEvent()
{
Timestamp = DateTime.Now;
}
}
}// Create an order update event handler , Inherit :IEventHandler<UpdateOrderEvent>
namespace MaH.SimpleLocalEventBus.Console
{
public class UpdateOrderHandler : IEventHandler<UpdateOrderEvent>
{
public Task InvokeAsync(UpdateOrderEvent eventData)
{
System.Console.WriteLine($"Update-{eventData.Timestamp}");
return Task.CompletedTask;
}
}
}
using System.Threading.Tasks;
// stay EventBus Register handlers in , When using eventBus.PublishAsync() After publishing an event , The handler will be automatically triggered according to the event type
namespace MaH.SimpleLocalEventBus.Console
{
class Program
{
static async Task Main(string[] args)
{
ILocalEventBus eventBus = new LocalEventBus();
UpdateOrderHandler updateOrderHandler = new UpdateOrderHandler();
eventBus.Subscribe(updateOrderHandler);
await eventBus.PublishAsync(new UpdateOrderEvent());
System.Console.ReadKey();
}
}
}
Local event bus
The above example shows a simple application of the local event bus , The implementation method is also very basic , You can go to MaH.EventBus Of Basic-1.0 Branch to view the specific implementation .
public class LocalEventBus : ILocalEventBus
{
private ConcurrentDictionary<Type, List<IHandler>> handlers;
public LocalEventBus()
{
handlers = new ConcurrentDictionary<Type, List<IHandler>>();
}
public async Task PublishAsync<TEvent>(TEvent eventData) where TEvent : class
{
await TriggerEventAsync(GetOrAddHandlers(typeof(TEvent)), eventData);
}
public void Subscribe<TEvent>(IEventHandler<TEvent> handler) where TEvent : class
{
GetOrAddHandlers(typeof(TEvent)).Add(handler);
}
}
We can notice that , The implementation here depends on the registration of specific Handler Instance object , If we want to control handler Life cycle of , Or say , We hope handler It can be downloaded from IOC Get... In the container , What should be done ?
The answer is to introduce the factory model , Use the factory method to obtain handler.
from IOC Get... In the container Handler
We can switch the branch to MaH.EventBus Of Basic-2.0 To see the implementation :
// LocalEventBus No longer save directly handler References to , Instead, save the corresponding IEventHandlerFactory
public class LocalEventBus : ILocalEventBus
{
private ConcurrentDictionary<Type, List<IEventHandlerFactory>> handlers;
public LocalEventBus()
{
handlers = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
}
public void Subscribe(Type eventType, IEventHandlerFactory handlerFactory)
{
GetOrAddHandlers(eventType).Add(handlerFactory);
}
}namespace MaH.SimpleLocalEventBus
{
public interface IEventHandlerFactory
{
IHandler GetHandler();
}
}I made two IEventHandlerFactory The implementation of the , One is SingletonEventHandlerFactory , Direct maintenance for handler Instance reference :
public class SingletonEventHandlerFactory : IEventHandlerFactory
{
private readonly IHandler _handlerInstance;
public SingletonEventHandlerFactory(IHandler handlerInstance)
{
_handlerInstance = handlerInstance;
}
public IHandler GetHandler() => _handlerInstance;
}The other is based on IOC Containers , preservation handler type And for IServiceScopeFactory References to :
public class DefaultEventHandlerFactory : IEventHandlerFactory
{
private readonly Type _handlerType;
private readonly IServiceScopeFactory _serviceScopeFactory;
public DefaultEventHandlerFactory(Type handlerType, IServiceScopeFactory serviceScopeFactory)
{
_handlerType = handlerType;
_serviceScopeFactory = serviceScopeFactory;
}
public IHandler GetHandler()
{
var scope = _serviceScopeFactory.CreateScope();
return (IHandler) scope.ServiceProvider.GetRequiredService(_handlerType);
}
}such , We can use it IOC technology .NET The event bus is used in the project ! Of course , A good event bus framework should also provide an automatic subscription mechanism , There is no implementation here .
public void ConfigureServices(IServiceCollection services)
{
services.AddTransient<WeatherForecastHandler>();
services.AddSingleton<ILocalEventBus>(sp =>
{
var eventBus = new LocalEventBus();
eventBus.Subscribe(typeof(WeatherUpdateEvent), new DefaultEventHandlerFactory(typeof(WeatherForecastHandler), sp.GetRequiredService<IServiceScopeFactory>()));
return eventBus;
});
}Distributed event bus
Simply implementing a local event bus is not enough , Because in microservice design , It often involves communication between multiple systems , For example, the payment system issues an order payment notice , SMS sites need to send payment success messages according to the order information , wait .
therefore , We need to further upgrade the current event bus design , With the help of RabbitMQ, To implement a distributed event bus . Of course , You can also use Kafka, Database etc. .
边栏推荐
- Questions and answers No. 43: application performance probe monitoring principle PHP probe
- May the mountains and rivers be safe
- scrapy_ Redis distributed crawler
- 分布式事务,原理简单,写起来全是坑
- Lamp architecture 4 -- MySQL source code compilation and use
- MySQL - index
- Babbitt yuancosmos daily must read: wechat may ban a official account for the first time on the grounds of "involving secondary transactions in digital collections", and the new regulations of the pla
- Cvpr2022 | the action sequence verification task was first proposed by X xiaohongshu of Shanghai University of science and technology, which can be applied to multiple scenarios such as scoring of spo
- What is Devops in an article?
- Qinglong panel, JD timed task library, script library
猜你喜欢

分布式事务,原理简单,写起来全是坑

Navigation bar switching, message board, text box losing focus

Tami dog sharing: the way of property right transaction and the significance of data-based property right transaction market

3000 frame animation illustrating why MySQL needs binlog, redo log and undo log

5000 word analysis: the way of container security attack and defense in actual combat scenarios

Lamp architecture 5 - MySQL Cluster and master-slave structure

Use map set or list set to store list set

How to write test cases
![[deeply understand tcapulusdb technology] tmonitor background one click installation](/img/0a/742503e96a9b51735f5fd3f598b9af.png)
[deeply understand tcapulusdb technology] tmonitor background one click installation

IMU selection, calibration error analysis, AHRS integrated navigation
随机推荐
The xdd-plus login prompt on the Qinglong panel is "the current login environment is abnormal. To ensure the security of your account, you cannot log in temporarily. It is recommended to connect the t
Numpy | insert variable length character array test OK
17 commonly used o & M monitoring systems
5000 word analysis: the way of container security attack and defense in actual combat scenarios
【深入理解TcaplusDB技术】TcaplusDB导入数据
7. pointer
Swift return button
Convert DICOM format to nii GZ file
基于STM32电压检测和电流检测
[deeply understand tcapulusdb technology] tcapulusdb import data
Setting of Seaborn drawing style
C language elementary order (VI) function
8. structure
How to use search engine?
Leetcode height checker
Kotlin - i/o flow
Are you still using generator to generate crud code of XXX management system? Let's see what I wrote
Cvpr2022 | the action sequence verification task was first proposed by X xiaohongshu of Shanghai University of science and technology, which can be applied to multiple scenarios such as scoring of spo
Master the basic usage of SQLite3
MySQL - view properties