首页 星云 工具 资源 星选 资讯 热门工具
:

PDF转图片 完全免费 小红书视频下载 无水印 抖音视频下载 无水印 数字星空

C# WebSocket高并发通信阻塞问题

编程知识
2024年09月04日 17:08

项目上遇到使用WebSocket超时问题,具体情况是这样的,OTA升级过程中,解压zip文件会有解压进度事件,将解压进度通过进程通信传给另一进程,通信提示超时异常

小伙伴堂园发现大文件使用Zip解压,解压进度事件间隔竟然是1ms,简直超大频率啊

但是,解压事件超频也不应该通信异常啊,于是我通过1ms定时发送通信事件,测试了下进程间通信流程。

WebSocketSharp

当前进程间通信组件是基于kaistseo/UnitySocketIO-WebSocketSharp实现,主机内设置一服务端,多个客户端连接服务端,客户端通信由服务端转发数据。客户端A发送给B后,客户端B会将执行结果反馈给客户A。

那在定位中发现,各个链路发送延时都是正常的,包括服务端发送反馈数据给到客户端A,但客户端A接收数据延时很大,下面是部分返回数据:

并且通信时间久了之后,延时会越来越大

这里是WebSocketSharp.WebSocket对外事件OnMessage:

 1     private void WebSocketOnMessage(object sender, MessageEventArgs e)
 2     {
 3         if (!e.IsText)
 4         {
 5             //暂时不支持
 6             return;
 7         }
 8         Debug.WriteLine($"{DateTime.Now.ToString("HH:mm:ss fff")},{e.Data}");
 9 
10         var receivedMessage = JsonConvertSlim.Decode<ChannelServerMessage>(e.Data);
11         xxxxx
12     }

我们继续往下看,OnMessage是由WebSocket.message()触发,从_messageEventQueue队列中获取数据:

 1     private void message ()
 2     {
 3       MessageEventArgs e = null;
 4       lock (_forMessageEventQueue) {
 5         if (_inMessage || _messageEventQueue.Count == 0 || _readyState != WebSocketState.Open)
 6           return;
 7 
 8         _inMessage = true;
 9         e = _messageEventQueue.Dequeue ();
10       }
11 
12       _message (e);
13     }

循环接收数据是这样拿的:

 1     private void startReceiving ()
 2     {
 3       xxxx
 4       _receivingExited = new ManualResetEvent (false);
 5       Action receive = () => WebSocketFrame.ReadFrameAsync (_stream, false,
 6             frame => {
 7               if (!processReceivedFrame (frame) || _readyState == WebSocketState.Closed) {
 8                 var exited = _receivingExited;
 9                 if (exited != null)
10                   exited.Set ();
11                 return;
12               }
13               // Receive next asap because the Ping or Close needs a response to it.
14               receive ();
15               xxxx
16               message ();
17             },
18             xxxx
19           );
20       receive ();
21     }

这里我看到了ManualResetEvent。。。数据量那么大,这里搞个同步信号锁,肯定会堵住咯

为何设置线程同步锁呢?我们往下看

WebSocketSharp数据发送是基于TCPClient实现的:

1     _tcpClient = new TcpClient (_proxyUri.DnsSafeHost, _proxyUri.Port);
2     _stream = _tcpClient.GetStream ();

初始化后通过_stream.Write (bytes, 0, bytes.Length);发送数据

接收数据,也是通过_stream读取,可以看上方的startReceiving()方法里,WebSocketFrame.ReadFrameAsync (_stream, false,...)

我们知道,TCP是面向连接,提供可靠、顺序的数据流传输。用于一对一的通信,即一个TCP连接只能有一个发送方和一个接收方。具体的可以看我之前写的文章:.NET TCP、UDP、Socket、WebSocket - 唐宋元明清2188 - 博客园 (cnblogs.com)

但接收时在高并发场景下,适当的同步措施依然是必需的。我们可以使用lock也可以用SemaphoreSlim来实现复杂的同步需求,这里使用的是信号锁ManualResetEvent

我们再看看发送端代码,也是用了lock一个object来限制并发操作:

 1     private bool send (Opcode opcode, Stream stream)
 2     {
 3       lock (_forSend) {
 4         var src = stream;
 5         var compressed = false;
 6         var sent = false;
 7         xxxxx 
 8         sent = send (opcode, stream, compressed);
 9         xxxxx 
10         return sent;
11       }
12     }

所以WebSocketSharp在高并发场景下是存在通信阻塞问题的。当然,WebSocketSharp已经实现的很好了,正常的话几ms都不会遇到阻塞问题,如下设置10ms定时超频发送:

客户端A发送消息,由服务端转发至客户B,再将客户端B的反馈结果由服务端转发回客户端A,真正延时才0-2ms!

WebSocket

我们再看看原生的WebSocket,写个WebSocket通信Demo kybs00/WebSocketDemo (github.com)

服务端定时1ms使劲往客户端发送Message消息,结果竟然是:

System.InvalidOperationException:“There is already one outstanding 'SendAsync' call for this WebSocket instance. ReceiveAsync and SendAsync can be called simultaneously, but at most one outstanding operation for each of them is allowed at the same time.”

看来发送事件外部也要处理好高并发的场景,1ms真的是太猛了

 

 1     private SemaphoreSlim _sendLock = new SemaphoreSlim(1);
 2     private async void Timer_Elapsed(object sender, ElapsedEventArgs e)
 3     {
 4         var message = $"{DateTime.Now.ToString("HH:mm:ss fff")},hello from server";
 5 
 6         await _sendLock.WaitAsync();
 7         await BroadcastAsync("test", message);
 8         _sendLock.Release();
 9         Console.WriteLine(message);
10     }

加完信号量同步,服务端就能正常发送了。下面是客户端接收数据,传输几乎无延时:

另外,也尝试了单独在客户端接收添加信号量同步,依然是提示服务端发送不支持并行的异常。

所以原生WebSocket在发送端需要串行处理,写入完数据后再执行_stream.FlushAsync()。

 

From:https://www.cnblogs.com/kybs0/p/18395504
本文地址: http://www.shuzixingkong.net/article/1734
0评论
提交 加载更多评论
其他文章 Centos7.9安装Docker和Docker compose
什么是docker环境 Docker环境是指在计算机中安装和配置了Docker引擎的运行环境。Docker是一种容器化平台,它提供了一种轻量级的虚拟化技术,能够将应用程序及其依赖项打包成一个独立的容器,以实现快速部署、可移植性和易于管理的优势。(Docker环境提供了一种方便、可移植和隔离的方式来管
如何排查线上w3wp.exe CPU高的问题,使用到了WinDbg、Visual studio来分析IIS进程池的.dmp文件
最近发现服务器上某个web站点老是CPU很高,该站点部署在IIS上,我IIS上有多个站点,每个站点一个进程池,每个进程池取名都是根据站点来取的,所以很容易看出哪个站点吃掉的CPU,该站点已运行十几年,是基于.net 4.8 framework 编写的web站点(十几年的老项目重构的话就不用提,新项目
如何排查线上w3wp.exe CPU高的问题,使用到了WinDbg、Visual studio来分析IIS进程池的.dmp文件 如何排查线上w3wp.exe CPU高的问题,使用到了WinDbg、Visual studio来分析IIS进程池的.dmp文件 如何排查线上w3wp.exe CPU高的问题,使用到了WinDbg、Visual studio来分析IIS进程池的.dmp文件
前端使用xlsx模板导出表格
前言 前端导出表格有很多种方案,但是表格样式一旦复杂了,那么就得用代码写excel的样式,还是比较麻烦的。每次样式不一样,就得重新写,这时使用表格模板的优势就体现出来了,想导出不同样式的表格直接修改表格模板即可。 方案 我找了两种方案: 1、使用xlsx-template,利用模板语法在xlsx中占
前端使用xlsx模板导出表格 前端使用xlsx模板导出表格 前端使用xlsx模板导出表格
移动端Android跟ios兼容性问题,反人类!!!
一、查询参数编码问题 我们在日常开发中,有时候会遇到拼接参数特别多的情况,那么就会导致一行代码特别长。那么为了美观呢,有的同学会进行换行处理,如下代码: 可以看到我红色框出来的地方就是经过了手动的回车导致产生的回车换行符。这么做乍一看也挺正常是吧,但其实对于JavaScript来说,这是会被保留的。
移动端Android跟ios兼容性问题,反人类!!! 移动端Android跟ios兼容性问题,反人类!!! 移动端Android跟ios兼容性问题,反人类!!!
一个开源、跨平台的.NET UI框架 - Avalonia UI
前言 今天大姚给大家分享一个开源、免费(MIT License)、跨平台的.NET UI框架:Avalonia UI。 Avalonia是一个成熟稳定的平台,用于构建桌面、嵌入式、移动的和Web应用程序。一个代码库,无限可能!!! 项目介绍 Avalonia是一个强大的框架,使开发人员能够使用.NE
一个开源、跨平台的.NET UI框架 - Avalonia UI 一个开源、跨平台的.NET UI框架 - Avalonia UI 一个开源、跨平台的.NET UI框架 - Avalonia UI
【Azure Policy】使用deployIfNotExists 把 Azure Activity logs 导出保存在Storage Account
问题描述 使用Azure Policy,对订阅下的全部Activity Log配置Diagnostic Setting,要求: 在Subscription或Management Group级别,针对未启用Activity Log功能的订阅,启用Activity Log功能; 对已经启用了Activi
【Azure Policy】使用deployIfNotExists 把 Azure Activity logs 导出保存在Storage Account 【Azure Policy】使用deployIfNotExists 把 Azure Activity logs 导出保存在Storage Account 【Azure Policy】使用deployIfNotExists 把 Azure Activity logs 导出保存在Storage Account
每天5分钟复习OpenStack(十五)Ceph与Bcache结合
上一章我们成功部署了bcache,这一章我们将Ceph与Bcache结合来使用,使用Bcache来为ceph的数据盘提速。 1 ceph 架构 一个标准的ceph集群可能是如下的架构,SSD/NVME 存储元数据,而SATA盘存储数据。这样的架构下,物理介质的SATA盘读写速率上限决定了存储集群Ce
每天5分钟复习OpenStack(十五)Ceph与Bcache结合 每天5分钟复习OpenStack(十五)Ceph与Bcache结合 每天5分钟复习OpenStack(十五)Ceph与Bcache结合
如何调用openai的TTS模型
这是24年1月份写的了,调用代码大概率有变动,仅供参考。 1 什么是OpenAI的TTS模型 OpenAI的TTS模型是一种文本到语音(Text-to-Speech)模型,它可以将给定的文本转换为自然语音音频。TTS代表Text-to-Speech,是一种人工智能技术,它使计算机能够模拟自然语言的声
如何调用openai的TTS模型