CSharpFlink分布式实时计算,OutOfMemoryException异常,你意想不到的原因。

  • A+
所属分类:.NET技术
摘要

 二、问题排查及分析过程  三、问题分析及解决过程  四、问题解决初步结果


 目录

一、测试过程及问题

 二、问题排查及分析过程

 三、问题分析及解决过程

 四、问题解决初步结果


 一、测试过程及问题

     从昨天15点左右开始测试,1个主节点,10个计算节点,1000个数据点,每个数据点3(1个实时窗口,2个延迟窗口)个数据窗口,每个数据点随时生成窗口周期和计算实例,每个数据点随时生成实时数据或历史数据。

     测试结果,由于程序无法再获得电脑的内存而停止工作,更专业的说是System. OutOfMemoryException。

     主节点,今天3点左右开始出现异常,如下:

[20-11-13 03:00:21]>>窗口0952-补发数据_CSharpFlink.Core.Window.Operator.Min-线程(0033):【2020/11/13 2:00:00-2020/11/13 3:00:00】,异常: Exception of type 'System.OutOfMemoryException' was thrown.   at System.Text.StringBuilder.ToString()    at CSharpFlink.Core.Task.MasterTaskManager.ParallelCalculate(ICalculateContext context) in CSharpFlinksrcCSharpFlink.CoreTaskMasterTaskManager.cs:line 358  [20-11-13 03:00:35]>>窗口0927-补发数据_CSharpFlink.Core.Window.Operator.Min-线程(0098):【2020/11/13 2:00:00-2020/11/13 3:00:00】,异常: Exception of type 'System.OutOfMemoryException' was thrown.   at System.Text.RegularExpressions.Match..ctor(Regex regex, Int32 capcount, String text, Int32 begpos, Int32 len, Int32 startpos)    at System.Text.RegularExpressions.RegexRunner.InitMatch()    at System.Text.RegularExpressions.RegexRunner.Scan(Regex regex, String text, Int32 textbeg, Int32 textend, Int32 textstart, Int32 prevlen, Boolean quick, TimeSpan timeout)    at System.Text.RegularExpressions.Regex.Run(Boolean quick, Int32 prevlen, String input, Int32 beginning, Int32 length, Int32 startat)    at System.Text.RegularExpressions.Match.NextMatch()    at System.Text.RegularExpressions.RegexReplacement.Replace(Regex regex, String input, Int32 count, Int32 startat)    at System.Text.RegularExpressions.Regex.Replace(String input, String replacement)    at CSharpFlink.Core.Task.MasterTaskManager.ParallelCalculate(ICalculateContext context) in CSharpFlinksrcCSharpFlink.CoreTaskMasterTaskManager.cs:line 358  [20-11-13 03:00:42]>>窗口0941-补发数据_CSharpFlink.Core.Window.Operator.Avg-线程(0085):【2020/11/13 2:00:00-2020/11/13 3:00:00】,异常: Exception of type 'System.OutOfMemoryException' was thrown.   at System.GC.AllocateNewArray(IntPtr typeHandle, Int32 length, Boolean zeroingOptional)    at System.GC.AllocateUninitializedArray[T](Int32 length)    at System.Buffers.TlsOverPerCoreLockedStacksArrayPool`1.Rent(Int32 minimumLength)    at System.Text.ValueStringBuilder.Grow(Int32 additionalCapacityBeyondPos)    at System.Text.ValueStringBuilder.Append(ReadOnlySpan`1 value)    at System.Text.RegularExpressions.RegexReplacement.Replace(Regex regex, String input, Int32 count, Int32 startat)    at System.Text.RegularExpressions.Regex.Replace(String input, String replacement)    at CSharpFlink.Core.Task.MasterTaskManager.ParallelCalculate(ICalculateContext context) in CSharpFlinksrcCSharpFlink.CoreTaskMasterTaskManager.cs:line 358 ValueStringBuilder.Append(ReadOnlySpan`1 value)    at System.Text.RegularExpressions.RegexReplacement.Replace(Regex regex, String input, Int32 count, Int32 startat)    at System.Text.RegularExpressions.Regex.Replace(String input, String replacement)    at CSharpFlink.Core.Task.MasterTaskManager.ParallelCalculate(ICalculateContext context) in CSharpFlinksrcCSharpFlink.CoreTaskMasterTaskManager.cs:line 358  [20-11-13 03:00:46]>>窗口0970-补发数据_CSharpFlink.Core.Window.Operator.Sum-线程(0074):【2020/11/13 2:00:00-2020/11/13 3:00:00】,异常: Exception of type 'System.OutOfMemoryException' was thrown.   at System.String.Concat(String str0, String str1)    at CSharpFlink.Core.Common.FileUtil.WriteAppend(String filePath, String[] contents) in CSharpFlinksrcCSharpFlink.CoreCommonFileUtil.cs:line 36    at CSharpFlink.Core.Task.MasterTaskManager.ParallelCalculate(ICalculateContext context) in CSharpFlinksrcCSharpFlink.CoreTaskMasterTaskManager.cs:line 370

 从节点,部分存活,部分异常退出,异常信息如下:

[20-11-13 02:00:38]>>任务解析异常: Exception of type 'System.OutOfMemoryException' was thrown.   at System.String.Concat(String str0, String str1)    at CSharpFlink.Core.Common.FileUtil.WriteAppend(String filePath, String[] contents) in CSharpFlinksrcCSharpFlink.CoreCommonFileUtil.cs:line 36    at CSharpFlink.Core.Task.SlaveTaskManager.AddTask(String taskMsg) in CSharpFlinksrcCSharpFlink.CoreTaskSlaveTaskManager.cs:line 138

       358行的代码:

 CalculateContext calcContext=(CalculateContext)context;

       370行的代码:

_masterCacheList.TryAdd(downTrans.Key, compressMsg);

        138行的代码:

_slaveCacheList.TryAdd(downTrans.Key, downTrans);

         masterCacheList和slaveCacheList变量是ConcurrentDictionary类。

二、问题排查及分析过程

     共性问题:记录的每处OutOfMemoryException异常信息都会涉及到对【String】的操作。

     第一步,使用dotnet-dump工具对String进行操作

     参考链接:https://docs.microsoft.com/zh-cn/dotnet/core/diagnostics/debug-memory-leak

CSharpFlink分布式实时计算,OutOfMemoryException异常,你意想不到的原因。

       System.String有1784359个对象,为什么这么多对象呢?因为要生成计算节点的任务,这个任务要临时保存到文件目录中,把计算任务的文件发送到计算节点后,再进行删除和清空程序缓存。

      写任务文件其中涉及到FileUtil.WriteAppend()方法,这个和上面异常的日志信息是对应的,WriteAppend的代码,如下:

public static void WriteAppend(string filePath, string[] contents) {     using (FileStream fs = new FileStream(filePath, FileMode.OpenOrCreate, FileAccess.Write, FileShare.ReadWrite))     {         fs.Seek(fs.Length, SeekOrigin.Current);     string content = String.Join(Environment.NewLine, contents) + Environment.NewLine;     byte[] data = System.Text.Encoding.UTF8.GetBytes(content);     fs.Write(data, 0, data.Length);     fs.Close();      } } 注:这是很早写的代码。

         其中Join函数,可能涉及到了Concat函数,和异常信息也是对应的。

        那就奇怪了,难道using和Close没有起来关闭和释放资源的目的吗?让我们来看看FileStream的基类Stream的Dispose和 Close都做了什么?看源代码,如下图:

CSharpFlink分布式实时计算,OutOfMemoryException异常,你意想不到的原因。

          从代码上看唯一做了SuppressFinalize函数操作,那么SuppressFinalize是什么意思呢?参见链接:

https://docs.microsoft.com/zh-cn/dotnet/api/system.gc.suppressfinalize?view=netcore-3.1

      上面链接的大概意思是:请求公共语言运行时不要调用指定对象的终结器。也就是说继承了IDisposable接口,就不再调用类的析构函数了,那析构函数做了什么呢?如下图:

CSharpFlink分布式实时计算,OutOfMemoryException异常,你意想不到的原因。

           我们分析至此,Dispose和Close相当于什么都没有做。那只能依赖GC来清理资源了。那在高并发下操作FileStream,Dispose和Close不起作用的情况下,难道GC没有及时回收资源?看来有可能是这个问题。

三、问题分析及解决过程

       但是怎么解决这个问题呢?记得FileStream类有一个Flush函数,具体操作函数代码,如下图:

CSharpFlink分布式实时计算,OutOfMemoryException异常,你意想不到的原因。

         Flush函数主要调用了FlushOSBuffer函数,代码如下图:

CSharpFlink分布式实时计算,OutOfMemoryException异常,你意想不到的原因。

         没有找到FlushFileBuffers函数,调用的函数,如下图:

CSharpFlink分布式实时计算,OutOfMemoryException异常,你意想不到的原因。

         这是非托管的代码,函数参考链接:https://docs.microsoft.com/zh-cn/windows/win32/api/fileapi/nf-fileapi-flushfilebuffers。大致意思是立即把数据写到磁盘文件中,但是没有找到该函数的源代码。

       不管源代码的事了,修改一下WriteAppend函数,加上Flush测试一下,代码如下:

using (FileStream fs = new FileStream(filePath, FileMode.OpenOrCreate, FileAccess.Write, FileShare.ReadWrite)) {     fs.Seek(fs.Length, SeekOrigin.Current); string content = String.Join(Environment.NewLine, contents) + Environment.NewLine; byte[] data = System.Text.Encoding.UTF8.GetBytes(content); fs.Write(data, 0, data.Length); fs.Flush();    //新增加代码。 fs.Close(); }

 四、问题解决初步结果

     上午10点部署,测试到下午15点,总共5个小时左右的时间。内存使用情况,主节点基本维持在:380 MB(1000数据点,每个数据点有3个数据窗口,如果1个窗口,应该在130 MB左右),子节点基本维持在:150 MB。有一段时间,内存会逐步增涨,但是某个时间点内存会释放到基本情况,曲线呈现正弦波趋势。内存使用情况,如下图:

CSharpFlink分布式实时计算,OutOfMemoryException异常,你意想不到的原因。CSharpFlink分布式实时计算,OutOfMemoryException异常,你意想不到的原因。


物联网&大数据技术 QQ群:54256083

物联网&大数据合作 QQ群:727664080

网站:http://www.ineuos.net

联系QQ:504547114

合作微信:wxzz0151

官方博客:https://www.cnblogs.com/lsjwq

iNeuOS工业互联网操作系统 公众号

CSharpFlink分布式实时计算,OutOfMemoryException异常,你意想不到的原因。