怎样关闭一个处于阻塞状态的tcp接收线程 如题,阻塞了 不能内部break 解决方案 » 免费领取超大流量手机卡,每月29元包185G流量+100分钟通话, 中国电信官方发货 俺们不知道你想表达什么。对,异步阻塞的。有数据来了就读,没数据来就自己阻塞。并没有任何毛病。同时他到底要怎么个break难道是where(ture){ 异步读 ///你想在这里break,他卡在上面了?}我们说,没啥问题没数据自然就卡在上面,如果有数据自然到下面。如果你想强行断开,把tcpclient.close了,他就自然而然的异常,然后你处理异常就是 TcpClient xx = new TcpClient(); while (xx.Connected) { if (xx.Available > 0) { try { await xx.GetStream().ReadAsync( cancellationToken:); //这里其实也有一个带canceltoken的重载,对,除了tcpclient.close 让他自己异常,也可以用canceltoken控制是否继续下去 } catch (Exception e) { Console.WriteLine(e); throw; } } } 阻塞了不是挺好吗,不占CPU,一个线程用来接收就够了,不用弄好几个线程都去接收。 无效连接,简单处理既然你说是“很多无效的连接过来了”,说明你是服务器端。那很简单的不管你是用所谓的IOCP,还是用其他的手段,开监听。acceptconnect确认连接以后就可以使用system..runtime.cache 下一个缓存依赖(相对过期时间控制)每次接收数据结束,访问一下cachecache失效触发移除事件,在移除事件里把tcpclient.close掉就好了(即使用那个IOCP的,其实也一样,你close掉,那个堵塞的自动异常,然后自动异常触发,归还入池) 弄个服务器比较复杂,我这里简单用tcpclient搞个demo,自己看把 System.Runtime.Caching.MemoryCache cache = new MemoryCache("tcpclientCache"); private async void button1_Click(object sender, EventArgs e) { TcpClient tcpClient = new TcpClient(); await tcpClient.ConnectAsync(IPAddress.Parse("192.168.4.100"), 6800); CacheItemPolicy policy = new CacheItemPolicy(); policy.SlidingExpiration = TimeSpan.FromMinutes(1); policy.RemovedCallback = p => { MessageBox.Show("缓存过期,我准备移除了"); var temp = (TcpClient)p.CacheItem.Value; temp.Client.Shutdown(SocketShutdown.Both); temp.Client.Close(); }; cache.Add("t1", tcpClient, policy); Task.Factory.StartNew(async () => { byte[] buffer =new byte[1024]; while (tcpClient.Connected) { if (tcpClient.Available > 0) { try { var i = await tcpClient.Client.ReceiveAsync(new ArraySegment<byte>(buffer), SocketFlags.None); var obj = cache.Get("t1");//此处访问一下,更新一下缓存依赖时间,当然有可能null,我简单演示就不处理了 var b = 0; } catch (Exception exception) { MessageBox.Show("异常了,此处应该处理,tcpclient断线处理,不管是对方断的,还是你自己断的"); } } } //此处循环断了,tcpclient确定没连了,该处理的处理 }); } TCp server ,等待设备连接,统计流量和丢包率,[] 里面的数字是包序号,0-99, 如果不连续 , 则判定丢包代码:using System;using System.Collections.Generic;using System.ComponentModel;using System.Data;using System.Drawing;using System.Linq;using System.Text;using System.Threading.Tasks;using System.Windows.Forms;using System.Net.Sockets;using System.Threading;using System.Net;using System.Data.SqlClient;namespace tcpServer{ public partial class Form_TcpServer : Form { public Form_TcpServer() { InitializeComponent(); txtPort.Text = "9444"; //初始化允许拖拽 InitDragInFile(); //初始化DataGridView InitDataGridView(); timer1.Interval = 1000; timer1.Start(); listBox1.Visible = false; } /// <summary> /// 监听套接字 /// </summary> private Socket socketWatch = null; //监听线程句柄 private Thread threadWatch = null; //通信线程套接字列表 List<Socket> listSocket = new List<Socket>(); //通信线程线程列表 List<Thread> listThread = new List<Thread>(); //IPEndPoint、帧率列表 private Dictionary<string, double> dictSps = new Dictionary<string, double>(); //IPEndpoint、丢包数列表 private Dictionary<string,int> dictLosePkts = new Dictionary<string,int>(); /// <summary> /// 初始化监听套接字 /// </summary> private void InitWatchSocket() { try { //构建socket对象 socketWatch = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); //构建ipaddress 对象 IPAddress ipaddress = IPAddress.Any; //构建endPoint对象 IPEndPoint endPoint = new IPEndPoint(ipaddress, Convert.ToInt32(txtPort.Text)); //端口绑定 socketWatch.Bind(endPoint); //监听 socketWatch.Listen(20); //创建监听线程 threadWatch = new Thread(watchThreadProc); //将窗体线程设置为与后台同步 threadWatch.IsBackground = true; //开启线程 threadWatch.Start(socketWatch); btnListen.Text = "监听中.."; MessageBox.Show("开启监听成功"); } catch(Exception er) { MessageBox.Show(er.Message); } } /// <summary> /// 等待客户端的连接,并且创建与之通信的socket /// </summary> private void watchThreadProc(object obj) { Socket sckWatch = obj as Socket; while (true) { //阻塞 Socket sock = sckWatch.Accept();//socketWatch.Accept() //委托 ParameterizedThreadStart pts = new ParameterizedThreadStart(ThreadRecvProc); Thread thr = new Thread(pts); thr.IsBackground = true; thr.Start(sock); listSocket.Add(sock); listThread.Add(thr); } } /// <summary> /// tcp 接收数据解析 /// </summary> /// <param name="temp"></param> /// <param name="state"></param> /// <param name="No"></param> /// <returns></returns> private bool Parse(byte temp, ref byte state, ref int No) { bool ret = false; switch(state) { case 0: if (temp == '[') state = 1; else state = 0; break; case 1://shi state = 2; No = temp - '0'; break; case 2://ge state = 3; No = No * 10 + temp - '0'; break; case 3: if (temp == ']') { ret = true; } else { No = 0; } state = 0; break; default: state = 0; break; } return ret; } /// <summary> /// tcp 接收线程 /// </summary> /// <param name="obj"></param> private void ThreadRecvProc(object obj) { Socket s = obj as Socket; byte[] buffer = new byte[1500]; long startTime = DateTime.Now.ToUniversalTime().Ticks; double sps = 0; UInt32 sumRecv = 0; int ret = 0; string remote = s.RemoteEndPoint.ToString(); byte state = 0; int No = 0, lastNo =-1; int losePkts = 0; while (true) { try { ret = s.Receive(buffer, 1500, SocketFlags.None); for( int i = 0; i< ret; i++) { if( Parse(buffer[i], ref state, ref No) == true ) { if( lastNo == -1) { lastNo = No; continue; } else { if(No == 0) { if (lastNo != 99) losePkts++; } else { if (No - lastNo != 1) losePkts++; } } lastNo = No; dictLosePkts[remote] = losePkts; } } } catch(Exception er) { //MessageBox.Show(er.Message); //错误提示一般是: 远程主机关闭了一个远程连接 Console.WriteLine( er.Message ); break; } long endTime = DateTime.Now.ToUniversalTime().Ticks; sumRecv += (UInt32)ret; if (endTime - startTime >= 10000 * 1000)//1s { startTime = DateTime.Now.ToUniversalTime().Ticks; sps = sumRecv * 8.0 / 1024 / 1024; sumRecv = 0; dictSps[remote] = sps; //Console.WriteLine("sps is {0}", sps); } //Console.WriteLine("ret is {0}", ret); } } private delegate void delegate_refresh(double sps); /// <summary> /// 刷新listBox控件 /// </summary> /// <param name="sps"></param> private void refreshListBox(double sps) { if (listBox1.InvokeRequired) { delegate_refresh d = new delegate_refresh(refreshListBox); this.Invoke(d, new object[] { sps }); } else { listBox1.Items.Add(sps.ToString()); } } private void btnListen_Click(object sender, EventArgs e) { InitWatchSocket(); } /// <summary> /// 初始化允许拖拽 /// </summary> private void InitDragInFile() { this.AllowDrop = true; } /// <summary> /// 打印输出拖入文件的路径 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void Form1_DragEnter(object sender, DragEventArgs e) { if (e.Data.GetDataPresent(DataFormats.FileDrop)) { string[] content = (string[])e.Data.GetData(DataFormats.FileDrop); foreach (var s in content) { Console.WriteLine("xx:" + s); } } } /// <summary> /// 初始化DataGridView控件 /// </summary> private void InitDataGridView() { //设置列数为3列 dataGridView1.ColumnCount = 3; string headers = "地址 速率(Mbps) 丢帧数(个)"; string[] header = headers.Split(' '); //设置列标题和列宽度 for (int i = 0; i < dataGridView1.ColumnCount; i++) { dataGridView1.Columns[i].HeaderText = header[i]; dataGridView1.Columns[i].Width = 150; } //设置最后一列为自动填充 dataGridView1.Columns[dataGridView1.ColumnCount - 1].AutoSizeMode = DataGridViewAutoSizeColumnMode.Fill; //选择模式为整行选中 dataGridView1.SelectionMode = DataGridViewSelectionMode.FullRowSelect; //添加第一行 //DataGridViewRow row = new DataGridViewRow(); //row.CreateCells(dataGridView1); //for (int i = 0; i < 3; i++) //{ // row.Cells[i].Value = i.ToString(); //} ////添加DataGridViewRow //dataGridView1.Rows.Add(row); } //定义委托类型 delegate void delegate_refreshDgv(Dictionary<string, double> dic, Dictionary<string, int> dic2); private void refreshDataGridView(Dictionary<string, double> dic, Dictionary<string, int> dic2) { //在线程中调用 if(this.InvokeRequired) { delegate_refreshDgv d = new delegate_refreshDgv(refreshDataGridView); this.Invoke(d, new object[] { dic, dic2 }); } else { dataGridView1.Rows.Clear(); try { foreach (var d in dic) 你贴得代码不是一样得, ThreadRecvProc(object obj)这个obj就是一个Socket么,那还有啥说得,自己把sokect close掉,就会自动进入你后面写得那个catch里面 s.Shutdown(SocketShutdown.Both); s.Close();这样就行了,想触发他,可以用上面得cache,如果不想用cache,用 while (s.Connected) { CancellationTokenSource cts=new CancellationTokenSource(TimeSpan.FromMinutes(1)); cts.Token.Register(() => { s.Shutdown(SocketShutdown.Both); s.Close(); }); ret = s.Receive(buffer, 1500, SocketFlags.None); } Task<bool> aTask = Task<bool>.Factory.StartNew(() => { for (int i = 0; i < 3; i++) { CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); CancellationToken token = cts.Token; token.Register(() => { if (cts != null) { Trace.WriteLine("线程a,超时取消"); } }); Task.Delay(TimeSpan.FromSeconds(3)).Wait(); cts = null; } return true; }); Task<bool> bTask = Task<bool>.Factory.StartNew(() => { for (int i = 0; i < 3; i++) { CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); CancellationToken token = cts.Token; token.Register(() => { if (cts != null) { Trace.WriteLine("线程b,超时取消"); } }); Task.Delay(TimeSpan.FromSeconds(1)).Wait(); cts = null; } return true; });其实超时处理有很多手段,我们只是展示一些常用得手段。那啥信号量,waitone这类我不打算写了,一次给太多消化不良ps:你是时候可以看看Task,async/wait 这类东西了,在往后面还不打算碰这些玩意得话,不说看nuget上得代码,就是他介绍给得简单演示都没办法看了 绝对的大师那个高级编程里面介绍 async/wait 这些的章节直接被我跳过去了。 同步tcp,如果因为recv阻塞等待,把socket关闭了,就可以不阻塞,退出循环 了。使用异步通讯 肯定是单拿一个线程来做的吧,放进线程池,得到future,另起线程检测 设置socket超时时间即可,如果超时没有数据接收就返回了 可以用cancellationTokenSource.Cancel()取消任务,但是如果不监听cancellationToken.IsCancellationRequested属性,或者不调用ThrowIfCancellationRequested方法任务一直执行下去 阻塞 也有退出条件,比如 超时 断开 等, 再soketerror 有阻塞中断的愿因 我知道 TerminateThread 函数 设置timeout时间应该可以吧 兄弟,你代码肯定有问题。不管你的接受逻辑是在一个单独线程还是在当前线程,你都不应该让接受逻辑阻塞。应该及时的让出cpu让其他代码干活儿,这样你的代码肯定效率会更高 C#自动解压压缩包 C#程序最小到系统托盘恢复的问题 关于C#编写com组件,供ASP使用的问题。 oledbdataadapter fill() 问题 请问对象的实例化,所需分配的内存空间是在编译期还是运行期分配? 帮忙解决嵌入 FireFox问题 通过OLEDB控件连接ORACLE数据库 c# 的 自定义菜单问题 怎样使文本框只能填入数字 《C#高级编程》此书的英文版叫《Professional C# (Beta 2 Edition)》!可见清华出版社用心之险恶! c#使用parent为什么图片向下移动了 如何根据字符串得到图片
{
异步读
///你想在这里break,他卡在上面了?
}我们说,没啥问题没数据自然就卡在上面,如果有数据自然到下面。如果你想强行断开,把tcpclient.close了,他就自然而然的异常,然后你处理异常就是
{
if (xx.Available > 0)
{
try
{
await xx.GetStream().ReadAsync( cancellationToken:); //这里其实也有一个带canceltoken的重载,对,除了tcpclient.close 让他自己异常,也可以用canceltoken控制是否继续下去
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
} }
}
System.Runtime.Caching.MemoryCache cache = new MemoryCache("tcpclientCache");
private async void button1_Click(object sender, EventArgs e)
{
TcpClient tcpClient = new TcpClient();
await tcpClient.ConnectAsync(IPAddress.Parse("192.168.4.100"), 6800);
CacheItemPolicy policy = new CacheItemPolicy();
policy.SlidingExpiration = TimeSpan.FromMinutes(1); policy.RemovedCallback = p =>
{
MessageBox.Show("缓存过期,我准备移除了");
var temp = (TcpClient)p.CacheItem.Value;
temp.Client.Shutdown(SocketShutdown.Both);
temp.Client.Close();
};
cache.Add("t1", tcpClient, policy); Task.Factory.StartNew(async () =>
{
byte[] buffer =new byte[1024]; while (tcpClient.Connected)
{
if (tcpClient.Available > 0)
{
try
{
var i = await tcpClient.Client.ReceiveAsync(new ArraySegment<byte>(buffer), SocketFlags.None);
var obj = cache.Get("t1");//此处访问一下,更新一下缓存依赖时间,当然有可能null,我简单演示就不处理了
var b = 0;
}
catch (Exception exception)
{
MessageBox.Show("异常了,此处应该处理,tcpclient断线处理,不管是对方断的,还是你自己断的");
}
} }
//此处循环断了,tcpclient确定没连了,该处理的处理
}); }
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using System.Net.Sockets;
using System.Threading;
using System.Net;
using System.Data.SqlClient;namespace tcpServer
{
public partial class Form_TcpServer : Form
{
public Form_TcpServer()
{
InitializeComponent(); txtPort.Text = "9444"; //初始化允许拖拽
InitDragInFile(); //初始化DataGridView
InitDataGridView(); timer1.Interval = 1000;
timer1.Start();
listBox1.Visible = false;
} /// <summary>
/// 监听套接字
/// </summary>
private Socket socketWatch = null; //监听线程句柄
private Thread threadWatch = null;
//通信线程套接字列表
List<Socket> listSocket = new List<Socket>();
//通信线程线程列表
List<Thread> listThread = new List<Thread>();
//IPEndPoint、帧率列表
private Dictionary<string, double> dictSps = new Dictionary<string, double>();
//IPEndpoint、丢包数列表
private Dictionary<string,int> dictLosePkts = new Dictionary<string,int>(); /// <summary>
/// 初始化监听套接字
/// </summary>
private void InitWatchSocket()
{
try
{
//构建socket对象
socketWatch = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
//构建ipaddress 对象
IPAddress ipaddress = IPAddress.Any;
//构建endPoint对象
IPEndPoint endPoint = new IPEndPoint(ipaddress, Convert.ToInt32(txtPort.Text));
//端口绑定
socketWatch.Bind(endPoint);
//监听
socketWatch.Listen(20);
//创建监听线程
threadWatch = new Thread(watchThreadProc);
//将窗体线程设置为与后台同步
threadWatch.IsBackground = true;
//开启线程
threadWatch.Start(socketWatch); btnListen.Text = "监听中.."; MessageBox.Show("开启监听成功");
}
catch(Exception er)
{
MessageBox.Show(er.Message);
}
}
/// <summary>
/// 等待客户端的连接,并且创建与之通信的socket
/// </summary>
private void watchThreadProc(object obj)
{
Socket sckWatch = obj as Socket; while (true)
{
//阻塞
Socket sock = sckWatch.Accept();//socketWatch.Accept() //委托
ParameterizedThreadStart pts = new ParameterizedThreadStart(ThreadRecvProc); Thread thr = new Thread(pts); thr.IsBackground = true; thr.Start(sock); listSocket.Add(sock);
listThread.Add(thr); }
}
/// <summary>
/// tcp 接收数据解析
/// </summary>
/// <param name="temp"></param>
/// <param name="state"></param>
/// <param name="No"></param>
/// <returns></returns>
private bool Parse(byte temp, ref byte state, ref int No)
{
bool ret = false; switch(state)
{
case 0:
if (temp == '[') state = 1;
else state = 0;
break; case 1://shi
state = 2;
No = temp - '0';
break; case 2://ge
state = 3;
No = No * 10 + temp - '0';
break; case 3:
if (temp == ']')
{
ret = true;
}
else
{
No = 0;
}
state = 0;
break; default:
state = 0;
break;
} return ret;
} /// <summary>
/// tcp 接收线程
/// </summary>
/// <param name="obj"></param>
private void ThreadRecvProc(object obj)
{
Socket s = obj as Socket; byte[] buffer = new byte[1500]; long startTime = DateTime.Now.ToUniversalTime().Ticks; double sps = 0; UInt32 sumRecv = 0; int ret = 0; string remote = s.RemoteEndPoint.ToString(); byte state = 0; int No = 0, lastNo =-1; int losePkts = 0;
while (true)
{ try
{
ret = s.Receive(buffer, 1500, SocketFlags.None);
for( int i = 0; i< ret; i++)
{
if( Parse(buffer[i], ref state, ref No) == true )
{
if( lastNo == -1)
{
lastNo = No;
continue;
}
else
{
if(No == 0)
{
if (lastNo != 99) losePkts++;
}
else
{
if (No - lastNo != 1) losePkts++;
} }
lastNo = No; dictLosePkts[remote] = losePkts;
}
}
}
catch(Exception er)
{
//MessageBox.Show(er.Message);
//错误提示一般是: 远程主机关闭了一个远程连接
Console.WriteLine( er.Message );
break;
}
long endTime = DateTime.Now.ToUniversalTime().Ticks; sumRecv += (UInt32)ret; if (endTime - startTime >= 10000 * 1000)//1s
{
startTime = DateTime.Now.ToUniversalTime().Ticks; sps = sumRecv * 8.0 / 1024 / 1024; sumRecv = 0; dictSps[remote] = sps;
//Console.WriteLine("sps is {0}", sps);
} //Console.WriteLine("ret is {0}", ret);
}
} private delegate void delegate_refresh(double sps); /// <summary>
/// 刷新listBox控件
/// </summary>
/// <param name="sps"></param>
private void refreshListBox(double sps)
{
if (listBox1.InvokeRequired)
{
delegate_refresh d = new delegate_refresh(refreshListBox); this.Invoke(d, new object[] { sps });
}
else
{
listBox1.Items.Add(sps.ToString());
}
}
private void btnListen_Click(object sender, EventArgs e)
{
InitWatchSocket();
} /// <summary>
/// 初始化允许拖拽
/// </summary>
private void InitDragInFile()
{
this.AllowDrop = true;
} /// <summary>
/// 打印输出拖入文件的路径
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void Form1_DragEnter(object sender, DragEventArgs e)
{
if (e.Data.GetDataPresent(DataFormats.FileDrop))
{
string[] content = (string[])e.Data.GetData(DataFormats.FileDrop); foreach (var s in content)
{
Console.WriteLine("xx:" + s);
}
}
}
/// <summary>
/// 初始化DataGridView控件
/// </summary>
private void InitDataGridView()
{
//设置列数为3列
dataGridView1.ColumnCount = 3;
string headers = "地址 速率(Mbps) 丢帧数(个)";
string[] header = headers.Split(' '); //设置列标题和列宽度
for (int i = 0; i < dataGridView1.ColumnCount; i++)
{
dataGridView1.Columns[i].HeaderText = header[i];
dataGridView1.Columns[i].Width = 150;
}
//设置最后一列为自动填充
dataGridView1.Columns[dataGridView1.ColumnCount - 1].AutoSizeMode = DataGridViewAutoSizeColumnMode.Fill; //选择模式为整行选中
dataGridView1.SelectionMode = DataGridViewSelectionMode.FullRowSelect;
//添加第一行
//DataGridViewRow row = new DataGridViewRow();
//row.CreateCells(dataGridView1);
//for (int i = 0; i < 3; i++)
//{
// row.Cells[i].Value = i.ToString();
//}
////添加DataGridViewRow
//dataGridView1.Rows.Add(row);
} //定义委托类型
delegate void delegate_refreshDgv(Dictionary<string, double> dic, Dictionary<string, int> dic2);
private void refreshDataGridView(Dictionary<string, double> dic, Dictionary<string, int> dic2)
{
//在线程中调用
if(this.InvokeRequired)
{
delegate_refreshDgv d = new delegate_refreshDgv(refreshDataGridView);
this.Invoke(d, new object[] { dic, dic2 });
}
else
{
dataGridView1.Rows.Clear(); try
{
foreach (var d in dic)
这个obj就是一个Socket么,那还有啥说得,自己把sokect close掉,就会自动进入你后面写得那个catch里面 s.Shutdown(SocketShutdown.Both);
s.Close();这样就行了,想触发他,可以用上面得cache,如果不想用cache,用 while (s.Connected)
{
CancellationTokenSource cts=new CancellationTokenSource(TimeSpan.FromMinutes(1));
cts.Token.Register(() =>
{
s.Shutdown(SocketShutdown.Both);
s.Close();
});
ret = s.Receive(buffer, 1500, SocketFlags.None);
}
{
for (int i = 0; i < 3; i++)
{
CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
CancellationToken token = cts.Token;
token.Register(() =>
{
if (cts != null)
{
Trace.WriteLine("线程a,超时取消");
}
}); Task.Delay(TimeSpan.FromSeconds(3)).Wait(); cts = null;
} return true;
}); Task<bool> bTask = Task<bool>.Factory.StartNew(() =>
{
for (int i = 0; i < 3; i++)
{
CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
CancellationToken token = cts.Token;
token.Register(() =>
{
if (cts != null)
{
Trace.WriteLine("线程b,超时取消");
}
}); Task.Delay(TimeSpan.FromSeconds(1)).Wait(); cts = null;
} return true;
});
其实超时处理有很多手段,我们只是展示一些常用得手段。那啥信号量,waitone这类我不打算写了,一次给太多消化不良ps:你是时候可以看看Task,async/wait 这类东西了,在往后面还不打算碰这些玩意得话,不说看nuget上得代码,就是他介绍给得简单演示都没办法看了
使用异步通讯