有个程序的某个事件会不断实时产生大量数据,产生的数据包含多个机器的数据,但是要对数据进行分类,比如判断数据是1(机器编号)的数据则写入1.log,总共有一百台机器。遇到的问题是当程序只写入20台机器的数据时,每个log文件里记录的数据是实时的,而随着机器数量的增加,log文件里的记录的数据就会有延迟(记录的是好几分钟前的数据,而不是现在的产生的数据)。请问如何程序如何设计才能使一百多个log文件记录的数据是实时的?
4条回答 默认 最新
- wanghui0380 2021-11-17 15:59关注
我写了一个单机测试demo,模拟100个生产者情况,bus选型是rx.net
nuget:System.Reactive
模拟代码
public class Program { //设计目标,不确定数量的生产者高速产生数据(其中个别生产者生产频率超高) //分门别类将数据记录到对应的log.txt中 static Subject<MsgPackage> bus = new Subject<MsgPackage>(); static void Main(string[] args) { //模拟demo,采用rx作为bus。 //暂时不考虑复杂的降权算法,我们先做一种简单策略,先用一个总体订阅判定缓存,缓存没有新建单独订阅任务,缓存有更新缓存生存期 //编写demo,我就暂时不引入缓存系统了,直接使用字典做了,如果你有兴趣用缓存系统,可以直接引入 //原本考虑使用valuetask,不过不想给demo引入过多的技术特征,还是选择了直接使用IDisposable //System.Collections.Concurrent.ConcurrentDictionary // <int, ValueTask> dic = new ConcurrentDictionary<int, ValueTask>(); System.Collections.Concurrent.ConcurrentDictionary <int, IDisposable> dic = new ConcurrentDictionary<int, IDisposable>(); bus.Subscribe(p => { //每一个数据源分开订阅处理,这样就不会因为个别生产者捣乱,让其他生产者数据处理不即使 if (!dic.ContainsKey(p.id)) { dic.GetOrAdd(p.id, msgHandler(p.id)); } }); //模拟100个生产者 for (int i = 0; i < 100; i++) { var j = i; Task.Run(async () => { var n = j + 1; while (true) { bus.OnNext(new MsgPackage() { id = j, playload = DateTime.Now.ToString() }); await Task.Delay(n * 100); //根据j延时,让生产者生产消息,很明显这里有些生产的快,有些生产的慢 } }); } Console.ReadKey(); } public static IDisposable msgHandler(int key) { string filename = $"{key}.log"; return bus.Where(p => p.id == key).Subscribe(msg => { var lines=new string[] { $"{msg.id}\t{msg.playload}"}; File.AppendAllLines(filename, lines); }); } /// <summary> /// 消息对象 /// </summary> public class MsgPackage { public int id { get; set; } public string playload { get; set; } } }
本回答被题主选为最佳回答 , 对您是否有帮助呢?解决 无用评论 打赏 举报