一个简易socket通信结构

  • 一个简易socket通信结构已关闭评论
  • 295 次浏览
  • A+
所属分类:.NET技术
摘要

工作需要又需要用到socketTCP通讯,这么多年了,终于稍微能写点了。让我说其实也说不出个啥来,看了很多的异步后稍微对异步socket的导流 endreceive后 再beginreceive 形成一个内循环有了个认识,加上我自己的封包拆包机制,然后再仿那些其它的大多数代码结构弄点onReceive事件进行 收包触发。整个过程就算差不多了 ,基本上是能够可靠运行的 靠谱的 中规中矩的,要说啥创新读到嘛真的谈不上。代码中写了很多low逼注释 也是为了方便自己理解 请无视。下面是server端代码,使用异步机制accept 异步receive ,成员有 clients代表当前在线的客户端 客户端socket包装为EndpointClient ,有onClientAddDel 代表客户端上线掉线事件,有onReceive代表所有客户端的收包事件,clients由于是异步的多线程访问就要涉及多线程管控 所以使用lock ,服务端有sendToAll() 和SendToSomeOne()毫无疑问这也是通过调用特定的clients来做的。


服务端

工作需要又需要用到socketTCP通讯,这么多年了,终于稍微能写点了。让我说其实也说不出个啥来,看了很多的异步后稍微对异步socket的导流 endreceive后 再beginreceive 形成一个内循环有了个认识,加上我自己的封包拆包机制,然后再仿那些其它的大多数代码结构弄点onReceive事件进行 收包触发。整个过程就算差不多了 ,基本上是能够可靠运行的 靠谱的 中规中矩的,要说啥创新读到嘛真的谈不上。代码中写了很多low逼注释 也是为了方便自己理解 请无视。下面是server端代码,使用异步机制accept 异步receive ,成员有 clients代表当前在线的客户端 客户端socket包装为EndpointClient ,有onClientAddDel 代表客户端上线掉线事件,有onReceive代表所有客户端的收包事件,clients由于是异步的多线程访问就要涉及多线程管控 所以使用lock ,服务端有sendToAll() 和SendToSomeOne()毫无疑问这也是通过调用特定的clients来做的。

以下是服务端代码

  1 public class MsgServerSchedule   2 {   3    4    5     Socket serverSocket;   6     public Action<List<string>> onClientAddDel;   7     public Action<Telegram_Base> onReceive;   8     bool _isRunning = false;   9   10       11     int port;  12   13     public TelgramType telType;  14   15     static List<EndpointClient> clients;  16   17     public bool isRunning { get { return _isRunning; } }  18     public MsgServerSchedule(int _port)  19     {  20         //any 就决定了 ip地址格式是v4  21         //IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, 7654);  22         //socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);  23   24         this.port = _port;  25   26         clients = new List<EndpointClient>();  27   28         Console.WriteLine("constructor");  29   30     }  31   32     public void Start()  33     {  34         try  35         {  36             IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port);  37             serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);  38             serverSocket.Bind(endPoint);  39             serverSocket.Listen(port);  40   41             serverSocket.BeginAccept(new AsyncCallback(AcceptCallback), serverSocket);  42   43             _isRunning = true;  44             Console.WriteLine("start");  45         }  46         catch (Exception ex)  47         {  48             _isRunning = false;  49             serverSocket = null;  50   51             Console.WriteLine("服务启动出现错误,可能端口已被占用:"+port);  52             Console.WriteLine(ex.Message);  53         }  54          55     }  56   57     public void Stop()  58     {  59         for (int i = 0; i < clients.Count; i++)  60         {  61             clients[i].Close();                  62         }  63         ClientAddDelGetList(null, EndPointClientsChangeType.ClearAll);  64         serverSocket.Close();  65         _isRunning = false;  66     }  67   68     public void SendToAll(Telegram_Base tel)  69     {  70         for (int i = 0; i < clients.Count; i++)  71         {  72             clients[i].Send(tel);  73         }  74     }  75   76     public void SendToSomeOne(Telegram_Base tel)  77     {  78         for (int i = 0; i < clients.Count; i++)  79         {  80             if(clients[i].remoteIPPort==tel.remoteIPPort)  81             {  82                 clients[i].Send(tel);  83                 break;  84             }  85         }  86     }  87   88     //新增与删除客户端 秉持原子操作  89     List<string> ClientAddDelGetList(EndpointClient cli, EndPointClientsChangeType changeType)  90     {  91         //异步同时有新客户端上线 与下线 不进行资源互斥访问会报错  92         lock (this)  93         {  94             if (changeType == EndPointClientsChangeType.Add)  95             {  96                 clients.Add(cli);  97             }  98             else if(changeType== EndPointClientsChangeType.Del)  99             { 100                 var beRemoveClient = clients.First(r => r.remoteIPPort == cli.remoteIPPort); 101                 if (beRemoveClient != null) 102                     clients.Remove(beRemoveClient); 103             } 104             else if(changeType== EndPointClientsChangeType.ClearAll) 105             { 106                 clients.Clear(); 107             } 108             else if (changeType == EndPointClientsChangeType.GetAll) 109             { 110                 List<string> onLines = new List<string>(clients.Count); 111                 for (int i = 0; i < clients.Count; i++) 112                 { 113                     onLines.Add(clients[i].remoteIPPort); 114                 } 115                 return onLines; 116             } 117             else 118             { 119                 return null; 120             } 121         } 122         return null; 123     } 124     //异步监听客户端 有客户端到来时的回调 125     private void AcceptCallback(IAsyncResult iar) 126     { 127         //server端一直在receive 能够感知到客户端掉线 (连上后 关闭客户端 server立即有错误爆出) 128         //但是同情况 关闭server端 客户端无错误爆出 直到点发送 才有错误爆出 129         //由此得出 处于receive才会有掉线感知  ,send时发现发不出去自然也会有感知 跟人的正常思维理解是一样的 130         //虽然tcp是所谓的长连接 ,通过反复测试  ->但是双方相互都处在一个静止状态 是无法 确定在不在的   131         //连上后平常的情况下 并没有数据流通 的 ,双方只是一个状态的保持而已。 132         //这也是为什么 好多服务 客户端 程序 都有个心跳机制(由此我们可以想到继续完善 弄个客户端列表 心跳超时的剔除列表 正常发消息send 或receive 异常的剔除列表 删除clientSocket 133         //其实非要说吧 一般情况 服务端一直在receive 不用心跳其实也是可以的(客户端可能是真的需要 134         //tcp底层就已经有了一个判断对方在不在的机制 , 对方直接关程序 结束进程 这些 只要tcp在receive就立即能够感知 所以说心跳 用不用看情况吧 135  136         //tcp 不会丢包 哪怕是连接建立了   你还没开始receive   对方却先发了, 137         //对方只要是发了的数据 都由操作系统像个缓存样给你放那的 不会掉 你再隔10秒开始receive都能rec的到 138  139         //tcp甚至在拔掉网线 再重新插上 都可以保证数据一致性 140         //tcp的包顺序能够保证 先发的先到 141  142         //nures代码中那些beginreceivexxx  异步receive的核心机制就是 ,假定数据到的时候把数据保存到xxx数组 143         //真正endreceive的时候 其实数据已经接收 处理完成了 144  145         try 146         { 147  148             //处理完当前accept 149             Socket currentSocket = serverSocket.EndAccept(iar); 150  151             EndpointClient client = new EndpointClient(currentSocket,telType); 152  153             //新增客户端 154             ClientAddDelGetList(client, EndPointClientsChangeType.Add); 155              156             if (onClientAddDel != null) 157             { 158                 List<string> onlines = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll); 159                 onClientAddDel(onlines); 160  161                 //客户端异常掉线 162                 client.onClientDel = new Action<string>((_remoteIPPort) => 163                 { 164                     ClientAddDelGetList(new EndpointClient(){ remoteIPPort=_remoteIPPort} , EndPointClientsChangeType.Del); 165  166                     List<string> onlines2 = ClientAddDelGetList(null, EndPointClientsChangeType.GetAll); 167                     onClientAddDel(onlines2); 168                 }); 169             } 170  171              172  173             //这句到时调用完成后 就会自动把 receivebuffer填充 //要接收的字节数 系统底层操作一次接收多少字节 其实意义不大 174             //总是从0开始(就是说并发时会覆盖 175  176             Console.WriteLine(string.Format("new client ->{0}", currentSocket.RemoteEndPoint.ToString())); 177  178             //currentSocket.Close(); 179             //Application.Exit(); 180  181             //Thread.Sleep(1000 * 10); 182             client.onReceive += this.onReceive; 183  184             client.BeginReceive(); 185  186  187             //立即开始accept新的客户端 188             if (isRunning == true && serverSocket != null) 189                 serverSocket.BeginAccept(AcceptCallback, serverSocket); 190             //beginAccept 最开始的方法可以不一样 ,但最终肯定是一个不断accept的闭环过程 191             //整个过程就像个导流样 ,最开始用异步导流到一个固定的点 然后让其循环源源不断运转 192  193             //加asynccallback 有什么不一样么 194             //socket.BeginAccept(new AsyncCallback( AcceptCallback), socket); 195  196         } 197         catch (Exception ex) 198         { 199             Console.WriteLine("AcceptCallback Error"); 200             Console.WriteLine(ex.Message); 201         } 202  203     } 204  205     206 }

EndpointClient终端代码代表客户端的对口人,他的onReceive 等资源从服务端继承 ,如果服务端想给某个特定客户端发数据则会调用他们中的某一个 毫无疑问这是通过remoteIPport来判断的,这些都是编写基本socket结构轻车熟路的老套路

以下EndpointClient代码

  1 public class EndpointClient   2 {   3     Socket workingSocket;   4     static int receiveBufferLenMax = 5000;   5     byte[] onceReadDatas = new byte[receiveBufferLenMax];   6     List<byte> receiveBuffer = new List< byte>(receiveBufferLenMax);   7    8     public string remoteIPPort { get; set; }   9       10     //当前残留数据区 接收数据的起始指针(也代表缓冲区数据长度  11     int receiveBufferLen = 0;  12   13   14     TelgramType telType;  15   16     public Action<Telegram_Base> onReceive;  17     public Action<string> onClientDel;  18   19     public EndpointClient()  20     {  21   22     }  23     public EndpointClient(Socket _socket,TelgramType _telType)  24     {  25         this.remoteIPPort = _socket.RemoteEndPoint.ToString();  26         this.telType = _telType;  27         workingSocket = _socket;  28     }  29   30     public void Send(Telegram_Base tel)  31     {  32         //try  33         //{  34             if(workingSocket==null)  35             {  36                 Console.WriteLine("未初始化的EndpointClient");  37                 return;  38             }  39             if (tel is Telegram_Schedule)  40             {  41                 Telegram_Schedule telBeSend = tel as Telegram_Schedule;  42                 if (telBeSend.dataBytes.Length != telBeSend.dataLen)  43                 {  44                     Console.WriteLine("尝试发送数据长度格式错误的报文");  45                     return;  46                 }  47   48                 byte[] sendBytesHeader = telBeSend.dataBytesHeader;  49                 byte[] sendbytes = telBeSend.dataBytes;  50   51                 //数据超过缓冲区长度 会导致无法拆包  52                 if (sendbytes.Length <= receiveBufferLenMax)  53                 {  54                     workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null);  55                     workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0,null,null  56                       57                     );  58                 }  59                 else  60                 {  61                     Console.WriteLine("发送到调度客户端的数据超过缓冲区长度");  62                     throw new Exception("发送到调度客户端的数据超过缓冲区长度");  63                 }  64   65             }  66             else if (tel is Telegram_SDBMsg)  67             {  68   69             }  70   71         //}  72         //catch (Exception ex)  73         //{  74   75         //    Console.WriteLine(ex.Message);  76         //    throw ex;  77         //}  78     }  79   80     public void BeginReceive()  81     {  82         if (workingSocket == null)  83         {  84             Console.WriteLine("未初始化的EndpointClient");  85             return;  86         }  87   88         receiveBufferLen = 0;  89         workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None,  90             ReceiveCallback,  91         this);  92     }  93     private void ReceiveCallback(IAsyncResult iar)  94     {  95         try  96         {  97             EndpointClient cli = (EndpointClient)iar.AsyncState;  98             Socket socket = cli.workingSocket;  99             int reads = socket.EndReceive(iar); 100  101             if (reads > 0) 102             { 103  104                 for (int i = 0; i < reads; i++) 105                 { 106                     receiveBuffer.Add(onceReadDatas[i]); 107                 } 108  109                 //具体填充了多少看返回值 此时 数据已经在buffer中了 110                 receiveBufferLen += reads; 111                 //加完了后解析 阻塞式处理 结束后开启新的接收 112                 SloveTelData(); 113  114                 if (receiveBufferLenMax - receiveBufferLen > 0) 115                 { 116                     //接收完了 继续beginreceive 开启异步的下次接收 (如果缓冲区有残留数据 则接收长度变短 ,没接收到的让其留在socket不会丢失 下次接收) 117                     socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this); 118                 } 119                 else//阻塞式处理都完成一遍了 都还没清理出任何缓冲区空间 毫无疑问 整体运转机制已经挂了 不用beginreceive下一次了 120                 { 121                     Close(); 122                     //移除自己 123                     if (onClientDel != null) 124                     { 125                         onClientDel(remoteIPPort); 126                     } 127                     Console.WriteLine("服务端接口解析数据出现异常"); 128                     throw new Exception("服务端接口解析数据出现异常"); 129                 } 130             } 131             else//reads==0 客户端已关闭 132             { 133                 Close(); 134                 //移除自己 135                 if (onClientDel != null) 136                 { 137                     onClientDel(remoteIPPort); 138                 } 139             } 140         } 141         catch (Exception ex) 142         { 143             Close(); 144             //移除自己 145             if (onClientDel != null) 146             { 147                 onClientDel(remoteIPPort); 148             } 149  150             Console.WriteLine("ReceiveCallback Error"); 151             Console.WriteLine(ex.Message); 152         } 153  154     } 155     void SloveTelData() 156     { 157         //进行数据解析 158         SloveTelDataUtil slo = new SloveTelDataUtil(); 159          160         if (telType == TelgramType.Schedule) 161         { 162             List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen, this.remoteIPPort); 163             //buffer已经被处理一遍了 使用新的长度 164             receiveBufferLen = receiveBuffer.Count; 165             //解析出的每一个对象都触发 onreceive 166             for (int i = 0; i < dataEntitys.Count; i++) 167             { 168                 if (onReceive != null) 169                     onReceive(dataEntitys[i]); 170             } 171         } 172         else if (telType == TelgramType.SDBMsg) 173         { 174  175         } 176  177     } 178  179     180     public void Close() 181     { 182         try 183         { 184             receiveBuffer.Clear(); 185             receiveBufferLen = 0; 186             if (workingSocket != null && workingSocket.Connected) 187                 workingSocket.Close(); 188         } 189         catch (Exception ex) 190         { 191             Console.WriteLine(ex.Message); 192         } 193          194     } 195 }

数据拆包与封包粘包处理

上面的代码可以看到 数据包处理都在receiveCallback里 SloveTelData,也是通用的套路 ,解析到完整的包后从缓冲区移除 解析多少个包触发多少次事件,残余数据留在缓冲区 然后继续开始新的beginReceive往缓冲区加。在异步机制中 到达endReceive的时候数据已经在缓冲区里了,这个自不用多说噻。数据包和粘包逻辑在公共类库里供客户端服务端共同调用

以下是粘包处理逻辑

  1 public class SloveTelDataUtil   2 {   3     List<Telegram_Schedule> solveList;   4     public SloveTelDataUtil()   5     {   6     }   7        8     List<byte> buffer;   9     int bufferLen;  10     int bufferIndex = 0;  11     string remoteIPPort;  12     public List<Telegram_Schedule> Slove_Telegram_Schedule( List< byte> _buffer,int _bufferLen,string _remoteIPPort)  13     {  14   15         solveList = new List<Telegram_Schedule>();  16   17         bufferIndex = 0;  18   19         buffer = _buffer;  20         bufferLen = _bufferLen;  21         remoteIPPort = _remoteIPPort;  22   23         //小于最小长度 直接返回  24         if (bufferLen < 12)  25             return solveList;  26   27         //进行数据解析  28         bool anaysisOK = false;  29         while (anaysisOK=AnaysisData_Schedule()==true)//直到解析的不能解析为止  30         {                  31         }  32         return solveList;  33     }  34   35     public bool AnaysisData_Schedule()  36     {  37         if (bufferLen - bufferIndex < GlobalSymbol.Headerlen)  38             return false;  39   40         //解析出一个数据对象  41         Telegram_Schedule telObj = new Telegram_Schedule();  42   43         //必定是大于最小数据大小的  44         telObj.dataBytesHeader = new byte[GlobalSymbol.Headerlen];  45         buffer.CopyTo(bufferIndex, telObj.dataBytesHeader, 0, GlobalSymbol.Headerlen);  46   47         byte[] btsHeader = new byte[4];  48         byte[] btsCommand = new byte[4];  49         byte[] btsLen = new byte[4];  50   51         btsHeader[0] = buffer[bufferIndex];  52         btsHeader[1] = buffer[bufferIndex+1];  53         btsHeader[2] = buffer[bufferIndex+2];  54         btsHeader[3] = buffer[bufferIndex+3];  55   56         bufferIndex += 4;  57   58         btsCommand[0] = buffer[bufferIndex];  59         btsCommand[1] = buffer[bufferIndex + 1];  60         btsCommand[2] = buffer[bufferIndex + 2];  61         btsCommand[3] = buffer[bufferIndex + 3];  62   63         bufferIndex += 4;  64   65         btsLen[0] = buffer[bufferIndex];  66         btsLen[1] = buffer[bufferIndex + 1];  67         btsLen[2] = buffer[bufferIndex + 2];  68         btsLen[3] = buffer[bufferIndex + 3];  69   70         bufferIndex += 4;  71   72           73   74         int dataLen = BitConverter.ToInt32(btsLen, 0);  75         telObj.header = BitConverter.ToUInt32(btsHeader, 0);  76         telObj.command = BitConverter.ToInt32(btsCommand, 0);  77         telObj.remoteIPPort = remoteIPPort;  78   79         if(dataLen>0)  80         {  81             //数据区小于得到的数据长度 说明数据部分还没接收到 不删除缓冲区 不做任何处理  82             //下次来了连着头一起解析  83             if (bufferLen - GlobalSymbol.Headerlen < dataLen)  84             {  85   86                 bufferIndex -= 12;//  87   88   89                 return false;  90   91             }  92             else  93             {  94   95                 telObj.dataLen = dataLen;  96                 telObj.dataBytes = new byte[dataLen];  97                 buffer.CopyTo(bufferIndex, telObj.dataBytes, 0, dataLen);  98                   99                 solveList.Add(telObj); 100                 //bufferIndex += dataLen; 101  102                 //解析成功一次 移除已解析的 103                 for (int i = 0; i < GlobalSymbol.Headerlen+dataLen; i++) 104                 { 105                     buffer.RemoveAt(0); 106                 } 107                 bufferIndex = 0; 108                 bufferLen = buffer.Count; 109                 return true; 110             } 111         } 112         else 113         { 114              115             telObj.dataLen = 0; 116             solveList.Add(telObj); 117             //bufferIndex += 0; 118             //解析成功一次 移除已解析的 119             for (int i = 0; i < GlobalSymbol.Headerlen; i++) 120             { 121                 buffer.RemoveAt(0); 122             } 123             //解析成功一次因为移除了缓冲区 bufferIndex置0 124             bufferIndex = 0; 125             bufferLen = buffer.Count; 126             return true; 127         } 128  129     } 130  131      132     public List<Telegram_SDBMsg> Slove_Telegram_SDBMsg(ref byte[] buffer) 133     { 134         return new List<Telegram_SDBMsg>(); 135     } 136 }

我们看到用到的数据包对象是Telegram_Schedule ,中间保存有报文数据,数据发送的目标等信息。

以下是数据包结构代码

 1 public class Telegram_Base  2 {  3     public string remoteIPPort { get; set; }  4     //数据内容  5     public byte[] dataBytes { get; set; }  6     //头部内容的序列化  7     public byte[] dataBytesHeader { get; set; }  8   9     public string jsonStr { get; set; } 10     virtual public void SerialToBytes() 11     { 12  13     } 14  15     virtual public void SloveToTel() 16     { 17  18     } 19  20 } 21  22 public class Telegram_Schedule:Telegram_Base 23 { 24      25     //头部标识 4字节 26     public UInt32 header { get; set; } 27     //命令对应枚举的 int 4字节 28     public int command { get; set; } 29     //数据长度 4字节 30     public int dataLen { get; set; } 31  32      33  34     override public void SerialToBytes() 35     { 36         //有字符串数据 但是待发送字节是空 37         if ((string.IsNullOrEmpty(jsonStr) == false ))//&& (dataBytes==null || dataBytes.Length==0) 38         { 39             dataBytes = Encoding.UTF8.GetBytes(jsonStr); 40             dataLen = dataBytes.Length; 41             dataBytesHeader = new byte[GlobalSymbol.Headerlen]; 42            43             header = GlobalSymbol.HeaderSymbol; 44              45             byte[] btsHeader = BitConverter.GetBytes(header); 46             byte[] btsCommand = BitConverter.GetBytes(command); 47             byte[] btsLen = BitConverter.GetBytes(dataLen); 48  49             Array.Copy(btsHeader, 0, dataBytesHeader, 0, 4); 50             Array.Copy(btsCommand, 0, dataBytesHeader, 4, 4); 51             Array.Copy(btsLen, 0, dataBytesHeader, 8, 4); 52  53         } 54         else if((string.IsNullOrEmpty(jsonStr) == true )&& (dataBytes==null || dataBytes.Length==0)){ 55             dataLen = 0; 56             dataBytes = new byte[0]; 57  58             dataBytesHeader = new byte[GlobalSymbol.Headerlen]; 59  60             header = GlobalSymbol.HeaderSymbol; 61  62             byte[] btsHeader = BitConverter.GetBytes(header); 63             byte[] btsCommand = BitConverter.GetBytes(command); 64             byte[] btsLen = BitConverter.GetBytes(dataLen); 65  66             Array.Copy(btsHeader, 0, dataBytesHeader, 0, 4); 67             Array.Copy(btsCommand, 0, dataBytesHeader, 4, 4); 68             Array.Copy(btsLen, 0, dataBytesHeader, 8, 4); 69         } 70     } 71  72     override public void SloveToTel() 73     { 74         //只解析字符串数据部分 ,header 和len 在接收之初就已解析 75         this.jsonStr = Encoding.UTF8.GetString(this.dataBytes); 76     } 77  78 }

客户端代码

最后是客户端,有了上面的结构,客户端就不足为谈了,稍微了解socket的人都熟知套路的 基本跟EndpointClient一致

  1 public class MsgClientSchedule   2 {   3     Socket workingSocket;   4     //缓冲区最大数据长度   5     static int receiveBufferLenMax = 5000;   6     //单次receive数据(取决于tcp底层封包 但是不会超过缓冲区最大长度   7     byte[] onceReadDatas = new byte[receiveBufferLenMax];   8     //未解析到完整数据包时的残余数据保存区   9     List<byte> receiveBuffer = new List<byte>(receiveBufferLenMax);  10   11     string serverIP { get; set; }  12     int serverPort { get; set; }  13     public string localIPPort { get; set; }  14   15     //残余缓冲区数据长度  16     int receiveBufferLen = 0;  17   18     bool _isConnected { get; set; }  19   20     TelgramType telType;  21   22     //收一个包时触发  23     public Action<Telegram_Base> onReceive;  24     //与服务端断链时触发  25     public Action<string> onClientDel;  26   27   28     public bool isConnected { get { return _isConnected; } }  29     public MsgClientSchedule(string _serverIP,int _port)  30     {  31         serverIP = _serverIP;  32         serverPort = _port;  33         _isConnected = false;  34     }  35   36     public void Connect()  37     {  38         try  39         {  40             workingSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.IP);  41             IPEndPoint ipport = new IPEndPoint(IPAddress.Parse(serverIP), serverPort);  42             workingSocket.Connect(ipport);  43   44             localIPPort = workingSocket.LocalEndPoint.ToString();  45             _isConnected = true;  46             BeginReceive();  47         }  48         catch (Exception ex)  49         {  50             workingSocket = null;  51             _isConnected = false;  52   53             Console.WriteLine(ex.Message);  54         }  55   56     }  57   58   59   60   61     public void Send(Telegram_Base tel)  62     {  63         try  64         {  65             if(_isConnected==false)  66             {  67                 Console.WriteLine("未连接到服务器");  68                 return;  69             }  70             if (tel is Telegram_Schedule)  71             {  72                 Telegram_Schedule telBeSend = tel as Telegram_Schedule;  73                 if (telBeSend.dataBytes.Length != telBeSend.dataLen)  74                 {  75                     Console.WriteLine("尝试发送数据长度格式错误的报文");  76                     return;  77                 }  78                 byte[] sendBytesHeader = telBeSend.dataBytesHeader;  79                 byte[] sendbytes = telBeSend.dataBytes;  80   81                 //数据超过缓冲区长度 会导致无法拆包  82                 if (sendbytes.Length <= receiveBufferLenMax)  83                 {  84                     workingSocket.BeginSend(sendBytesHeader, 0, sendBytesHeader.Length, 0, null, null);  85                     workingSocket.BeginSend(sendbytes, 0, sendbytes.Length, 0, null, null  86                           87                     );  88                 }  89                 else  90                 {  91                     Console.WriteLine("发送到调度客户端的数据超过缓冲区长度");  92                     throw new Exception("发送到调度客户端的数据超过缓冲区长度");  93                 }  94   95                   96             }  97             else if (tel is Telegram_SDBMsg)  98             {  99  100             } 101  102         } 103         catch (Exception ex) 104         { 105             Close(); 106             Console.WriteLine(ex.Message); 107             //throw ex; 108         } 109     } 110  111     public void BeginReceive() 112     { 113         receiveBufferLen = 0; 114         workingSocket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax, SocketFlags.None, 115             ReceiveCallback, 116              117         this); 118     } 119     private void ReceiveCallback(IAsyncResult iar) 120     { 121         try 122         { 123             MsgClientSchedule cli = (MsgClientSchedule)iar.AsyncState; 124             Socket socket = cli.workingSocket; 125             int reads = socket.EndReceive(iar); 126  127             if (reads > 0) 128             { 129  130                 for (int i = 0; i < reads; i++) 131                 { 132                     receiveBuffer.Add(onceReadDatas[i]); 133                 } 134  135                 //具体填充了多少看返回值 此时 数据已经在buffer中了 136  137                 receiveBufferLen += reads; 138  139                 //加完了后解析 阻塞式处理 结束后开启新的接收 140                 SloveTelData(); 141  142  143  144                 if (receiveBufferLenMax - receiveBufferLen > 0) 145                 { 146                     //接收完了 继续beginreceive 开启异步的下次接收 (如果缓冲区有残留数据 则接收长度变短 ,没接收到的让其留在socket不会丢失 下次接收) 147                     socket.BeginReceive(onceReadDatas, 0, receiveBufferLenMax - receiveBufferLen, SocketFlags.None, ReceiveCallback, this); 148                 } 149                 else//阻塞式处理都完成一遍了 都还没清理出任何缓冲区空间 毫无疑问 整体运转机制已经挂了 不用beginreceive下一次了 150                 { 151                     Close(); 152                      153                     Console.WriteLine("服务端接口解析数据出现异常"); 154                     throw new Exception("服务端接口解析数据出现异常"); 155                 } 156             } 157             else//reads==0客户端已关闭 158             { 159                 Close();                     160             } 161         } 162         catch (Exception ex) 163         { 164             Close(); 165              166             Console.WriteLine("ReceiveCallback Error"); 167             Console.WriteLine(ex.Message); 168         } 169  170     } 171     private void SloveTelData() 172     { 173          174         //进行数据解析 175         SloveTelDataUtil slo = new SloveTelDataUtil(); 176  177         if (telType == TelgramType.Schedule) 178         { 179             List<Telegram_Schedule> dataEntitys = slo.Slove_Telegram_Schedule(receiveBuffer, receiveBufferLen,serverIP+":"+serverPort.ToString()); 180             //buffer已经被处理一遍了 使用新的长度 181             receiveBufferLen = receiveBuffer.Count; 182             //解析出的每一个对象都触发 onreceive 183             for (int i = 0; i < dataEntitys.Count; i++) 184             { 185                 if (onReceive != null) 186                     onReceive(dataEntitys[i]); 187             } 188         } 189         else if (telType == TelgramType.SDBMsg) 190         { 191  192         } 193  194     } 195  196  197     public void Close() 198     { 199         try 200         { 201             _isConnected = false; 202  203             receiveBuffer.Clear(); 204             receiveBufferLen = 0; 205             if (workingSocket != null && workingSocket.Connected) 206                 workingSocket.Close(); 207         } 208         catch (Exception ex) 209         { 210             Console.WriteLine(ex.Message); 211         } 212  213     } 214  215 }

服务端调用

构建一个winform基本项目

 1 List<string> clients;  2 MsgServerSchedule server;  3 private void btn_start_Click(object sender, EventArgs e)  4 {  5     server = new MsgServerSchedule(int.Parse(tbx_port.Text));  6   7   8     server.Start();  9     if (server.isRunning == true) 10     { 11         btn_start.Enabled = false; 12  13         server.onReceive += new Action<Telegram_Base>( 14         (tel) => 15         { 16             this.BeginInvoke(new Action(() => 17             { 18                 if (tel is Telegram_Schedule) 19                 { 20                     Telegram_Schedule ts = tel as Telegram_Schedule; 21                     ts.SloveToTel(); 22                     Console.WriteLine(string.Format("commandType:{0}", ((ScheduleTelCommandType)ts.command).ToString())); 23  24                     tbx_msgs.Text += ts.remoteIPPort + ">" + ts.jsonStr + "rn"; 25  26                     //数据回发测试 27                     string fromip = ts.remoteIPPort; 28                     string srcMsg = ts.jsonStr; 29                     string fromServerMsg = ts.jsonStr + " -from server"; 30                     ts.jsonStr = fromServerMsg; 31  32  33                     //如果消息里有指向信息 则转送到对应的客户端 34                     if (clients != null) 35                     { 36                         string to = null; 37                         for (int i = 0; i < clients.Count; i++) 38                         { 39                             if (srcMsg.Contains(clients[i])) 40                             { 41                                 to = clients[i]; 42                                 break; 43                             } 44                         } 45  46                         if (to != null) 47                         { 48                             ts.remoteIPPort = to; 49                             string toMsg; 50                             //toMsg= srcMsg.Replace(to, ""); 51                             toMsg = srcMsg.Replace(to, fromip); 52                             ts.jsonStr = toMsg; 53                             ts.SerialToBytes(); 54  55                             server.SendToSomeOne(ts); 56                         } 57                         else 58                         { 59                             ts.SerialToBytes(); 60                             server.SendToSomeOne(ts); 61                         } 62                     } 63                 } 64             })); 65  66         } 67         ); 68  69         server.onClientAddDel += new Action<List<string>>((onlines) => 70         { 71             this.BeginInvoke( 72                 new Action(() => 73                 { 74                     clients = onlines; 75                     listbox_clients.Items.Clear(); 76  77                     for (int i = 0; i < onlines.Count; i++) 78                     { 79                         listbox_clients.Items.Add(onlines[i]); 80                     } 81                 })); 82         }); 83     } 84 } 85 private void btn_sendAll_Click(object sender, EventArgs e) 86 { 87     Telegram_Schedule tel = new Telegram_Schedule(); 88     tel.header = GlobalSymbol.HeaderSymbol; 89     tel.command = (int)ScheduleTelCommandType.StartC2S; 90     tel.jsonStr = tbx_sendAll.Text; 91     tel.SerialToBytes(); 92  93     server.SendToAll(tel); 94 }

客户端调用

 1 MsgClientSchedule client;  2   3 private void btn_start_Click(object sender, EventArgs e)  4 {  5     client = new MsgClientSchedule(tbx_ip.Text, int.Parse(tbx_port.Text));  6   7     client.Connect();  8   9     if (client.isConnected == true) 10     { 11         btn_start.Enabled = false; 12          13         label1.Text = client.localIPPort; 14  15         client.onReceive = new Action<Telegram_Base>((tel) => 16         { 17             this.BeginInvoke( 18                 new Action(() => 19                 { 20                     tel.SloveToTel(); 21                     tbx_rec.Text += tel.jsonStr + "rn"; 22  23                 })); 24         }); 25     } 26  27 } 28  29  30  31 private void btn_send_Click(object sender, EventArgs e) 32 { 33  34     if (client == null || client.isConnected == false) 35         return; 36  37     //for (int i = 0; i < 2; i++) 38     //{ 39         Telegram_Schedule tel = new Telegram_Schedule(); 40         tel.command = (int)ScheduleTelCommandType.MsgC2S; 41      42         tel.jsonStr = tbx_remoteip.Text+">"+ tbx_msgSend.Text; 43         tel.SerialToBytes();//发出前要先序列化 44  45         client.Send(tel); 46     //} 47      48 }

实现效果

可以多客户端连接互相自由发送消息,服务端可以编写转发规则代码,那些什么棋牌啊 互动白板 以及其他类似的应用就可以基于此之上发挥想象了

一个简易socket通信结构