C#同步异步回调状态机asyncawaitDemo

博客 动态
0 361
优雅殿下
优雅殿下 2022-01-24 14:01:07
悬赏:0 积分 收藏

C# 同步 异步 回调 状态机 async await Demo

源码

https://gitee.com/s0611163/AsyncAwaitDemo

为什么会研究这个?

我们项目的客户端和服务端通信用的是WCF,我就想,能不能用异步的方式调用WCF服务呢?或者说能不能用async await的方式调用WCF服务呢?

然后我发现WCF是通过BeginXXX和EndXXX这种回调的方式实现异步的,似乎不支持async await语法糖,那只能通过状态机的方式实现了,不然多个请求就会写成回调地狱。

如果用Task.Run包起来,里面再写个Wait,这就是假异步,还不如不用异步,按原来非异步的方式写。

研究了回调和状态机之后,又看了相关的博客,就像一篇博客里说的,调用WCF不难,异步调用WCF不难,异步调用WCF并保证调用顺序也不难,难的是实现了这些之后,代码的可读性和可维护性。

所以我的结论是,调用WCF,如果没有特殊需求,还是不要异步的好,写起来复杂。

C# 同步 异步 回调 状态机 async await Demo

主要演示了不使用async、await语法糖,通过回调函数和状态机实现异步

为什么要写这个Demo?

为了帮助理解异步,async、await经过编译生成的状态机代码,有点复杂,这个Demo里写了一个相对简单的状态机代码,便于理解

代码说明

代码中主要写了三种下载文件的示例

  1. 同步方式下载文件,为了防止下载较大文件时卡住界面,代码在线程中执行,文件下载完成之前,始终占用一个线程
  2. 异步方式下载文件,使用了async、await语法糖,下载文件时,可以看到,workerThreads(可用线程数)和completionPortThreads(可用异步线程数)会发生变化,但是不会长时间占用一个线程
  3. 异步方式下载文件,不使用async、await语法糖,通过回调函数和状态机实现,workerThreads(可用线程数)和completionPortThreads(可用异步线程数)会发生变化,但是不会长时间占用一个线程

结论:异步的本质是回调,异步是回调的语法糖

  1. 相比同步方式,使用异步方式下载文件时,忽略掉误差,下载速度并没有更快,异步的主要优点是不会长时间占用一个线程
  2. 在没有async、await语法糖时,使用回调函数和状态机也可以实现异步,但代码写的不够优雅,心智负担重,所以async、await的另一个优点是使代码简单
  3. 异步的本质就是回调,C#异步底层是通过系统级回调和状态机实现的,async、await会被编译成状态机代码,相关于用代码生成器生成了代码,但这个代码自己写的话,心智负担重
  4. 不使用async、await语法糖的前提下,使用状态机可以避免回调地狱
  5. 即使不使用async、await语法糖,依然感受到了异步的侵入性,没办法在底层完全封装起来,代码一直侵入到控件的事件里,使用async、await也是,你从代码的底层,写async一直写到控件的事件层

思考:

经测试,button4_Click、button5_Click、button6_Click在下载大文件时,耗时较长,但是界面并不会卡住,依然可以自由拖动窗体或点击其它按钮

button4_Click、button5_Click、button6_Click方法分别调用了AsyncFunctionWithCallback方法
AsyncFunctionWithCallback方法调用了HttpUtil类的HttpDownloadFileAsyncWithCallback方法

请仔细观察上面的方法,它们没有使用new Thread、Task.Run、Task.Factory.StartNew等线程相关的方法,
也就是说执行长时间操作,没有阻塞界面,但是又没有使用线程,
实际上下载文件过程中,没有使用C#的CLR线程池,测试时,workerThreads数量不会变化,但是你会看到completionPortThreads数量变化,
下载文件的过程使用了异步线程池,但是下载大文件时,并不会长时间占用异步线程池

Demo截图

测试代码Form1.cs

using Models;using System;using System.Collections.Generic;using System.ComponentModel;using System.Data;using System.Drawing;using System.IO;using System.Linq;using System.Text;using System.Threading;using System.Threading.Tasks;using System.Windows.Forms;using Utils;namespace AsyncAwaitDemo{    public partial class Form1 : Form    {        private DateTime _dt1;        private System.Timers.Timer _timer;        private string _testFileDownloadUrl = "http://desk-fd.zol-img.com.cn/t_s1920x1080c5/g6/M00/06/00/ChMkKWHXqV-IJFpUAEJatCdq_LUAAXW8AA1PvwAQlrM029.jpg?downfile=1642227460763.jpg"; //文件下载测试URL        //private string _testFileDownloadUrl = "https://down21.xiazaidb.com/app/xiaomanghe.apk"; //文件下载测试URL        //private string _testFileDownloadUrl = "https://codeload.github.com/0611163/DBHelper/zip/refs/heads/master"; //文件下载测试URL        //private string _testFileDownloadUrl = "http://down-ww5.537a.com/soft/3/ce/com.yhong.muchun_51982ada.apk"; //文件下载测试URL        //private string _testFileDownloadUrl = "https://dl.360safe.com/netunion/20140425/360se+191727+n3f07b78190.exe"; //文件下载测试URL 大一点的文件        public Form1()        {            InitializeComponent();        }        private void Form1_Load(object sender, EventArgs e)        {            //定时器会使用线程池中的一个线程            _timer = new System.Timers.Timer();            _timer.Interval = 100;            _timer.Elapsed += _timer_Elapsed;            _timer.Start();        }        #region Log        private void Log(string log)        {            if (!this.IsDisposed)            {                string msg = DateTime.Now.ToString("mm:ss.fff") + " " + log + "\r\n\r\n";                if (this.InvokeRequired)                {                    this.BeginInvoke(new Action(() =>                    {                        textBox1.AppendText(msg);                    }));                }                else                {                    textBox1.AppendText(msg);                }            }        }        #endregion        #region _timer_Elapsed        private void _timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)        {            int workerThreads;            int completionPortThreads;            int maxWorkerThreads;            int maxCompletionPortThreads;            ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads);            ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxCompletionPortThreads);            this.BeginInvoke(new Action(() =>            {                label1.Text = "程序当前使用线程数:" + (maxWorkerThreads - workerThreads) + "  workerThreads:" + workerThreads.ToString() + "  completionPortThreads:" + completionPortThreads;            }));        }        #endregion        #region button1_Click 测试同步方法        private void button1_Click(object sender, EventArgs e)        {            Task.Run(() => //下载文件是耗时操作,需要在线程中执行,否则界面卡住            {                Log("开始");                DateTime dt = DateTime.Now;                //输入参数                string arg1 = "1";                string arg2 = "2";                string arg3 = "3";                //方法调用                string result1 = SyncFunction(arg1);                string result2 = SyncFunction(arg2);                string result3 = SyncFunction(arg3);                //输出结果                Log(result1);                Log(result2);                Log(result3);                Log("结束,耗时:" + DateTime.Now.Subtract(dt).TotalSeconds.ToString("0.000"));            });        }        #endregion        #region button2_Click 测试异步方法(三个异步方法并行,按顺序输出)        private async void button2_Click(object sender, EventArgs e)        {            Log("开始");            DateTime dt = DateTime.Now;            //输入参数            string arg1 = "1";            string arg2 = "2";            string arg3 = "3";            //方法调用            var t1 = AsyncFunction(arg1);            var t2 = AsyncFunction(arg2);            var t3 = AsyncFunction(arg3);            string result1 = await t1;            string result2 = await t2;            string result3 = await t3;            //输出结果            Log(result1);            Log(result2);            Log(result3);            Log("结束,耗时:" + DateTime.Now.Subtract(dt).TotalSeconds.ToString("0.000"));        }        #endregion        #region button3_Click 测试异步方法(三个异步方法顺序执行,按顺序输出)        private async void button3_Click(object sender, EventArgs e)        {            Log("开始");            DateTime dt = DateTime.Now;            //输入参数            string arg1 = "1";            string arg2 = "2";            string arg3 = "3";            //方法调用            string result1 = await AsyncFunction(arg1);            string result2 = await AsyncFunction(arg2);            string result3 = await AsyncFunction(arg3);            //输出结果            Log(result1);            Log(result2);            Log(result3);            Log("结束,耗时:" + DateTime.Now.Subtract(dt).TotalSeconds.ToString("0.000"));        }        #endregion        #region button4_Click 测试状态机(三个异步方法顺序执行,按顺序输出)(等价于button3_Click)        private void button4_Click(object sender, EventArgs e)        {            List<object> resultList = new List<object>();            //回调、状态机,不使用async await语法糖            Action<int, object> awaitAction = null;            awaitAction = (state, result) =>            {                //输入参数                string arg1 = "1";                string arg2 = "2";                string arg3 = "3";                //方法调用                if (state == 0)                {                    Log("开始");                    _dt1 = DateTime.Now;                    AsyncFunctionWithCallback(arg1, r => awaitAction(1, r));                    return;                }                if (state == 1)                {                    resultList.Add(result);                    AsyncFunctionWithCallback(arg2, r => awaitAction(2, r));                    return;                }                if (state == 2)                {                    resultList.Add(result);                    AsyncFunctionWithCallback(arg3, r => awaitAction(3, r));                    return;                }                //输出结果                if (state == 3)                {                    resultList.Add(result);                    foreach (object item in resultList)                    {                        Log(item != null ? item.ToString() : "null");                    }                    Log("结束,耗时:" + DateTime.Now.Subtract(_dt1).TotalSeconds.ToString("0.000"));                    return;                }            };            awaitAction(0, null);        }        #endregion        #region button5_Click 测试状态机(三个异步方法并行,按顺序输出)(等价于button2_Click)        private void button5_Click(object sender, EventArgs e)        {            object[] resultArray = new object[3];            //是否已调用回调函数            bool _completedCalled = false;            //锁            object _lock = new object();            //输出结果            Action<object[]> output = arr =>            {                if (arr.Count(a => a != null) == 3)                {                    lock (_lock)                    {                        if (!_completedCalled)                        {                            _completedCalled = true;                            foreach (object item in arr)                            {                                Log(item != null ? item.ToString() : "null");                            }                            Log("结束,耗时:" + DateTime.Now.Subtract(_dt1).TotalSeconds.ToString("0.000"));                        }                    }                }            };            //回调、状态机,不使用async await语法糖            Action<int, object> awaitAction = null;            awaitAction = (state, result) =>            {                //输入参数                string arg1 = "1";                string arg2 = "2";                string arg3 = "3";                //方法调用                if (state == 0)                {                    Log("开始");                    _dt1 = DateTime.Now;                    AsyncFunctionWithCallback(arg1, r => awaitAction(1, r));                    AsyncFunctionWithCallback(arg2, r => awaitAction(2, r));                    AsyncFunctionWithCallback(arg3, r => awaitAction(3, r));                    return;                }                //输出结果                if (state == 1)                {                    resultArray[0] = result;                    output(resultArray);                    return;                }                if (state == 2)                {                    resultArray[1] = result;                    output(resultArray);                    return;                }                if (state == 3)                {                    resultArray[2] = result;                    output(resultArray);                    return;                }            };            awaitAction(0, null);        }        #endregion        #region button6_Click 测试状态机(三个异步方法并行,按顺序输出)(等价于button2_Click)(相对于button5_Click更优雅的实现)        private void button6_Click(object sender, EventArgs e)        {            Log("开始");            DateTime dt = DateTime.Now;            //输入参数            string arg1 = "1";            string arg2 = "2";            string arg3 = "3";            //操作结果            object result1 = null;            object result2 = null;            object result3 = null;            //方法调用            Await await = new Await();            await.Add(() => AsyncFunctionWithCallback(arg1, r => await.Collect(r, out result1)))                .Add(() => AsyncFunctionWithCallback(arg2, r => await.Collect(r, out result2)))                .Add(() => AsyncFunctionWithCallback(arg3, r => await.Collect(r, out result3)))                .Completed(resultList => //输出结果                {                    Log(result1 != null ? result1.ToString() : "null");                    Log(result2 != null ? result2.ToString() : "null");                    Log(result3 != null ? result3.ToString() : "null");                    Log("结束,耗时:" + DateTime.Now.Subtract(dt).TotalSeconds.ToString("0.000"));                })                .Start();        }        #endregion        #region button7_Click 不使用状态机,执行带回调的异步方法(基本等价于button1_Click)        private void button7_Click(object sender, EventArgs e)        {            Task.Run(() =>            {                Log("开始");                DateTime dt = DateTime.Now;                //输入参数                string arg1 = "1";                string arg2 = "2";                string arg3 = "3";                //操作结果                object result1 = null;                object result2 = null;                object result3 = null;                //方法调用                AsyncFunctionWithCallback(arg1, r => result1 = r);                AsyncFunctionWithCallback(arg2, r => result2 = r);                AsyncFunctionWithCallback(arg3, r => result3 = r);                //等待三个操作结果全部返回,三个操作结果返回之前,始终占用一个线程                while (result1 == null || result2 == null || result3 == null)                {                    Thread.Sleep(1);                }                //输出结果                Log(result1 != null ? result1.ToString() : "null");                Log(result2 != null ? result2.ToString() : "null");                Log(result3 != null ? result3.ToString() : "null");                Log("结束,耗时:" + DateTime.Now.Subtract(dt).TotalSeconds.ToString("0.000"));            });        }        #endregion        #region 同步方法        /// <summary>        /// 同步方法        /// </summary>        private string SyncFunction(string arg)        {            MemoryStream ms = HttpUtil.HttpDownloadFile(_testFileDownloadUrl);            return "同步方法 arg=" + arg + " 下载文件的字节数组长度=" + ms.Length;        }        #endregion        #region 异步方法        /// <summary>        /// 异步方法        /// </summary>        private async Task<string> AsyncFunction(string arg)        {            MemoryStream ms = await HttpUtil.HttpDownloadFileAsync(_testFileDownloadUrl);            return "异步方法 arg=" + arg + " 下载文件的字节数组长度=" + ms.Length;        }        #endregion        #region 带回调的异步方法        /// <summary>        /// 带回调的异步方法        /// </summary>        /// <param name="arg">传入参数</param>        /// <param name="callback">回调</param>        /// <param name="nextState">下一个状态</param>        private void AsyncFunctionWithCallback(string arg, Action<object> callback)        {            HttpUtil.HttpDownloadFileAsyncWithCallback(_testFileDownloadUrl, new HttpUtilAsyncState() { State = 0 }, obj =>            {                if (obj is Exception)                {                    Exception ex = obj as Exception;                    Log("错误:" + ex.Message);                }                else                {                    MemoryStream ms = obj as MemoryStream;                    string result = "带回调的异步方法 arg=" + arg + " 下载文件的字节数组长度=" + ms.Length;                    callback(result);                }            });        }        #endregion        #region 用Task模拟的异步方法        /*        #region 异步方法        /// <summary>        /// 异步方法        /// </summary>        private Task<string> AsyncFunction(string arg)        {            return Task.Run<string>(() =>            {                Thread.Sleep(2000);                return "异步方法 arg=" + arg;            });        }        #endregion        #region 带回调的异步方法        /// <summary>        /// 带回调的异步方法        /// </summary>        /// <param name="arg">传入参数</param>        /// <param name="callback">回调</param>        /// <param name="nextState">下一个状态</param>        private void AsyncFunctionWithCallback(string arg, Action<object> callback)        {            Task.Run(() =>            {                if (arg == "1")                {                    Thread.Sleep(2000);                }                else                {                    Thread.Sleep(2000);                }                string result = "带回调的异步方法 arg=" + arg;                callback(result);            });        }        #endregion        */        #endregion    }}

异步工具类Await.cs

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace Utils{    /// <summary>    /// 异步工具类    /// 代替async await语法糖    /// </summary>    public class Await    {        /// <summary>        /// 状态        /// </summary>        private int _state { get; set; }        /// <summary>        /// 异步操作集合        /// </summary>        private List<Action> _actionList = new List<Action>();        /// <summary>        /// 操作结果集合        /// </summary>        private List<object> _resultList = new List<object>();        /// <summary>        /// 完成后的回调        /// </summary>        private Action<List<object>> _completedCallback;        /// <summary>        /// 是否已调用回调函数        /// </summary>        private bool _completedCalled = false;        /// <summary>        /// 锁        /// </summary>        private object _lock = new object();        /// <summary>        /// 异步工具类 构造函数        /// </summary>        public Await()        {            _state = 0;        }        /// <summary>        /// 添加一个异步操作        /// </summary>        public Await Add(Action action)        {            _actionList.Add(action);            return this;        }        /// <summary>        /// 开始执行        /// </summary>        public Await Start()        {            foreach (Action action in _actionList)            {                action();            }            return this;        }        /// <summary>        /// 收集结果        /// </summary>        public Await Collect(object result, out object outResult)        {            outResult = result;            _resultList.Add(result);            lock (_lock)            {                _state++;                if (_state == _actionList.Count && _completedCallback != null && !_completedCalled)                {                    _completedCalled = true;                    _completedCallback(_resultList);                }            }            return this;        }        /// <summary>        /// 注册完成后的回调函数        /// </summary>        public Await Completed(Action<List<object>> completedCallback)        {            this._completedCallback = completedCallback;            return this;        }    }}

文件下载工具类HttpUtil.cs

下面的代码使用了三种方式实现文件下载

  1. 同步方式实现文件下载
  2. 异步方式实现文件下载
  3. 通过回调和状态机实现的异步下载
using Models;using System;using System.Collections.Generic;using System.Collections.Specialized;using System.IO;using System.Linq;using System.Net;using System.Text;using System.Threading;using System.Threading.Tasks;namespace Utils{    /// <summary>    /// Http上传下载文件    /// </summary>    public class HttpUtil    {        #region HttpDownloadFile 下载文件        /// <summary>        /// 下载文件        /// </summary>        /// <param name="url">下载文件url路径</param>        /// <param name="cookie">cookie</param>        public static MemoryStream HttpDownloadFile(string url, CookieContainer cookie = null, WebHeaderCollection headers = null)        {            try            {                // 设置参数                HttpWebRequest request = WebRequest.Create(url) as HttpWebRequest;                request.Method = "GET";                request.CookieContainer = cookie;                if (headers != null)                {                    foreach (string key in headers.Keys)                    {                        request.Headers.Add(key, headers[key]);                    }                }                //发送请求并获取相应回应数据                HttpWebResponse response = request.GetResponse() as HttpWebResponse;                //直到request.GetResponse()程序才开始向目标网页发送Post请求                Stream responseStream = response.GetResponseStream();                //创建写入流                MemoryStream stream = new MemoryStream();                byte[] bArr = new byte[10240];                int size = responseStream.Read(bArr, 0, (int)bArr.Length);                while (size > 0)                {                    stream.Write(bArr, 0, size);                    size = responseStream.Read(bArr, 0, (int)bArr.Length);                }                stream.Seek(0, SeekOrigin.Begin);                responseStream.Close();                return stream;            }            catch (Exception ex)            {                throw ex;            }        }        #endregion        #region HttpDownloadFile 下载文件(异步)        /// <summary>        /// 下载文件        /// </summary>        /// <param name="url">下载文件url路径</param>        /// <param name="cookie">cookie</param>        public static async Task<MemoryStream> HttpDownloadFileAsync(string url, CookieContainer cookie = null, WebHeaderCollection headers = null)        {            try            {                // 设置参数                HttpWebRequest request = WebRequest.Create(url) as HttpWebRequest;                request.Method = "GET";                request.CookieContainer = cookie;                if (headers != null)                {                    foreach (string key in headers.Keys)                    {                        request.Headers.Add(key, headers[key]);                    }                }                //发送请求并获取相应回应数据                HttpWebResponse response = await request.GetResponseAsync() as HttpWebResponse;                //直到request.GetResponse()程序才开始向目标网页发送Post请求                Stream responseStream = response.GetResponseStream();                //创建写入流                MemoryStream stream = new MemoryStream();                byte[] bArr = new byte[10240];                int size = await responseStream.ReadAsync(bArr, 0, (int)bArr.Length);                while (size > 0)                {                    stream.Write(bArr, 0, size);                    size = await responseStream.ReadAsync(bArr, 0, (int)bArr.Length);                }                stream.Seek(0, SeekOrigin.Begin);                responseStream.Close();                return stream;            }            catch (Exception ex)            {                throw ex;            }        }        #endregion        #region HttpDownloadFile 下载文件(基于回调的异步)        /// <summary>        /// 下载文件        /// </summary>        /// <param name="url">下载文件url路径</param>        /// <param name="cookie">cookie</param>        public static void HttpDownloadFileAsyncWithCallback(string url, HttpUtilAsyncState state = null, Action<object> callback = null, CookieContainer cookie = null, WebHeaderCollection headers = null)        {            try            {                if (state.State == 0)                {                    // 设置参数                    HttpWebRequest request = WebRequest.Create(url) as HttpWebRequest;                    request.Method = "GET";                    request.CookieContainer = cookie;                    if (headers != null)                    {                        foreach (string key in headers.Keys)                        {                            request.Headers.Add(key, headers[key]);                        }                    }                    //发送请求并获取相应回应数据                    request.BeginGetResponse(asyncResult =>                    {                        HttpUtilAsyncState asyncState = asyncResult.AsyncState as HttpUtilAsyncState;                        try                        {                            HttpWebResponse response = request.EndGetResponse(asyncResult) as HttpWebResponse;                            asyncState.ResponseStream = response.GetResponseStream();                            HttpDownloadFileAsyncWithCallback(null, asyncState, callback);                        }                        catch (Exception ex)                        {                            asyncState.Exception = ex;                            asyncState.State = 2;                            HttpDownloadFileAsyncWithCallback(null, asyncState, callback);                        }                    }, new HttpUtilAsyncState { Request = request, State = 1 });                    return;                }                if (state.State == 1)                {                    byte[] bArr = new byte[10240];                    state.ResponseStream.BeginRead(bArr, 0, (int)bArr.Length, asyncResult =>                    {                        HttpUtilAsyncState asyncState = asyncResult.AsyncState as HttpUtilAsyncState;                        try                        {                            int size = state.ResponseStream.EndRead(asyncResult);                            if (size > 0)                            {                                state.Stream.Write(bArr, 0, size);                                HttpDownloadFileAsyncWithCallback(null, asyncState, callback);                            }                            else                            {                                asyncState.State = 2;                                HttpDownloadFileAsyncWithCallback(null, asyncState, callback);                            }                        }                        catch (Exception ex)                        {                            asyncState.Exception = ex;                            asyncState.State = 2;                            HttpDownloadFileAsyncWithCallback(null, asyncState, callback);                        }                    }, state);                    return;                }                if (state.State == 2)                {                    if (state.Exception != null)                    {                        callback(state.Exception);                        return;                    }                    else                    {                        state.Stream.Seek(0, SeekOrigin.Begin);                        state.ResponseStream.Close();                        callback(state.Stream);                        return;                    }                }            }            catch (Exception ex)            {                throw ex;            }        }        #endregion    }}
posted @ 2022-01-22 20:56 秋荷雨翔 阅读(431) 评论(4) 编辑 收藏 举报
回帖
    优雅殿下

    优雅殿下 (王者 段位)

    2017 积分 (2)粉丝 (47)源码

    小小码农,大大世界

     

    温馨提示

    亦奇源码

    最新会员