Question

Hello guys I'm pretty new to the whole async stuff and it would be nice if you could give me some advice. I'm not really sure if my approach is OK.

Lets say I have a bus device that is reading data and it fires an event if a telegram is completed. Now I want to check each telegram for its length. If length == expectation -> OK if not try until is OK or timeout. But it want to check for length 1, 2 and 5 at the same time.

UPDATE: OK I changed my example to an async approach but I still can't figure out how this should help me with my problem? OK on the plus side I don't have threads anymore that are blocked most of the time, but this wasn't my problem :(

So I try to explain in a different way. I want a async method that listens on the bus and returns the telegram that match the defined length

async Task<byte[]> GetTelegramAsync(int length, Timespan timeout)

I want to do something like this

Task<byte[]> t1 = GetTelegramAsync(1);
Task<byte[]> t2 = GetTelegramAsync(6);
Task<byte[]> t4 = GetTelegramAsync(4);
Task t4 = DoOtherStuffAsync();
DoStuff();

Task.WaitAll(AsyncRsp(t1), AsyncRsp(t2), AsyncRsp(t3), t4);

/* Output
Get telegram with length of 1
Get telegram with length of 6
Get telegram with length of 4 
Start doing other async stuff
Sync stuff done...
Telegram found 0x00 0x01 0x02 0x03 0x04 0x05
Async stuff done...
Telegram found 0xFF
Telegram with length 4 not found
*/

Here is my first BusDevice class. A thread starts that listens on the bus, if a telegram is received an event fires.

class BusDeviceThread
{
    private readonly Random _r = new Random();
    private Thread _t;

    public event EventHandler<TelegramReceivedArgs> TelegramReceived;               

    public void Connect()
    {
        _t = new Thread(FetchBusData)
        {
            Name = "FetchBusData",
            Priority = ThreadPriority.Normal
        };
        _t.Start();            
    }

    public void Close()
    {
        _t.Abort();
        _t.Join();
    }       

    private void FetchBusData()
    {
        while (true)
        {
            Thread.Sleep(_r.Next(100, 1000));

            var buffer = new byte[_r.Next(1, 10)];
            _r.NextBytes(buffer);
            OnTelegramReceived(new TelegramReceivedArgs(buffer));
        }
    }

    private void OnTelegramReceived(TelegramReceivedArgs e)
    {
        var handler = TelegramReceived;
        if (handler != null) handler(this, e);
    }        
}

And here is the changed BusDevice class utilizing async await.

class BusDeviceAsync
{
    private readonly Random _r = new Random();

    public event EventHandler<TelegramReceivedArgs> TelegramReceived;

    public async Task Connect(CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            var telegram = await FetchBusData();
            OnTelegramReceived(new TelegramReceivedArgs(telegram.ToArray()));
        }
    }

    private async Task<IEnumerable<byte>> FetchBusData()
    {
        await Task.Delay(_r.Next(100, 1000));

        var buffer = new byte[_r.Next(1, 10)];
        _r.NextBytes(buffer);

        return buffer;
    }

    private void OnTelegramReceived(TelegramReceivedArgs e)
    {
        var handler = TelegramReceived;
        if (handler != null) handler(this, e);
    }
}

Like I said it doesn't help me with my problem, the

async Task<byte[]> GetTelegramAsync(int length, Timespan timeout)

implementation stays the same or do I miss a point here?

byte[] GetTelegram(int length, TimeSpan timeout)
{
   byte[] telegram = null;

   using (var resetEvent = new AutoResetEvent(false))
   {
       EventHandler<TelegramReceivedArgs> handler = (sender, e) =>
            {
                var t = e.Telegram;
                if (Check(t, length))
                {
                    telegram = t;
                    resetEvent.Set(); 
                }
            };
         _d.TelegramReceived += handler;
        resetEvent.WaitOne(timeout.Milliseconds);
        _d.TelegramReceived -= handler;
    }

    return telegram ?? new byte[0];
}

async Task<byte[]> GetTelegramAsync(int length, TimeSpan timeout)
{
    return await Task.Run(() => GetTelegram(length, timeout));
}
Was it helpful?

Solution

I updated my example, but I can't figure out the difference regarding my problem. Well I certainly have fixed the blocked thread "problem".

This is not exactly what I meant, you're still using the pull model for your data (now with help of Task.Delay), not the push model (where the notification is coming asynchronously from the bus driver, as shown here).

Anyway, I think the following implementation might be what you're looking for. Note it doesn't explicitly use threads at all, beside for async I/O bus read simulation. Substitute the real device APM API for readBus:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    class Program
    {
        public class TelegramEventArg: EventArgs
        {
            public byte[] Data { get; set; }
        }

        public EventHandler<TelegramEventArg> TelegramEvent = delegate { };

        async Task<byte[]> ReadTelegramAsync(int size, CancellationToken token)
        {
            var tcs = new TaskCompletionSource<byte[]>();
            EventHandler<TelegramEventArg> handler = null;
            bool subscribed = false;

            handler = (s, e) => 
            {
                if (e.Data.Length == size)
                {
                    this.TelegramEvent -= handler;
                    subscribed = false;
                    tcs.TrySetResult(e.Data);
                }
            };

            this.TelegramEvent += handler;
            try
            {
                subscribed = true;
                using (token.Register(() => tcs.TrySetCanceled()))
                {
                    await tcs.Task.ConfigureAwait(false);
                    return tcs.Task.Result;
                }
            }
            finally
            {
                if (subscribed)
                    this.TelegramEvent -= handler;
            }
        }

        async Task ReadBusAsync(CancellationToken token)
        {
            while (true)
            {
                // get data from the bus
                var data = await Task.Factory.FromAsync(
                    (asyncCallback, asyncState) => 
                        readBus.BeginInvoke(asyncCallback, asyncState),
                    (asyncResult) => 
                        readBus.EndInvoke(asyncResult), 
                    state: null).ConfigureAwait(false);

                token.ThrowIfCancellationRequested();
                this.TelegramEvent(this, new TelegramEventArg { Data = data });
            }
        }

        // simulate the async bus driver with BeginXXX/EndXXX APM API
        static readonly Func<byte[]> readBus = () =>
        {
            var random = new Random(Environment.TickCount);
            Thread.Sleep(random.Next(1, 500));
            var data = new byte[random.Next(1, 5)];
            Console.WriteLine("A bus message of {0} bytes", data.Length);
            return data;
        };

        static void Main(string[] args)
        {
            try
            {
                var program = new Program();
                var cts = new CancellationTokenSource(Timeout.Infinite); // cancel in 10s

                var task1 = program.ReadTelegramAsync(1, cts.Token);
                var task2 = program.ReadTelegramAsync(2, cts.Token);
                var task3 = program.ReadTelegramAsync(3, cts.Token);

                var busTask = program.ReadBusAsync(cts.Token);

                Task.WaitAll(task1, task2, task3);
                Console.WriteLine("All telegrams received");
                cts.Cancel(); // stop ReadBusAsync
            }
            catch (Exception ex)
            {
                while (ex is AggregateException)
                    ex = ex.InnerException;
                Console.WriteLine(ex.Message);
            }
            Console.ReadLine();
        }
    }
}

Also, this scenario appears to be an ideal candidate for implementation using Reactive Extensions (Rx). As time allows, I'll show how to do that.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top