copying a file to another using BlockingCollection. destination is different than source and sometimes garbage. Am i overlooking anything in my code?

StackOverflow https://stackoverflow.com/questions/22363889

سؤال

EDIT & Update - I have tried the same code now on my personal computer and it works very much fine. I was able to copy any type of file using this same code without any problem. I have this issue when I run the code on my work computer. I just don't understand how and why this depends on the computer. please let me know whether I am missing anything here.

In the readTask I am sequentially reading a file and adding the bytes to BlockingCollection. and in the consume task I am reading the data as it appears in BlockingCollection and writing it to a file. Since, by default, BlockingCollection is a wrapper on ConcurrentQueue I expect the read from blocking queue to be in same order as it was written to it. But when I compare the destination file with source it is totally different and sometimes I see duplicates.
My source file is just a sequence of numbers with each number on new line, like below.

1
2
3
4
5
6
7
8
9
10

in my file I have around 5000 numbers for the file to have enough size. Is there something wrong with this code? or is it not the way blocking collection is supposed to work. In this example I am writing to a file but in reality I need to push this file to a Rest API and it is important for data to be sent in sequence. if the bytes cant be sent in sequence file will be corrupt when it is stored on server.

static void Main(string[] args)
    {
        BlockingCollection<byte[]> bc = new BlockingCollection<byte[]>(10);

        Task consumeTask = Task.Factory.StartNew(() =>
        {
            var fs = File.OpenWrite(@"C:\Temp\pass_new.txt");
            foreach (byte[] data in bc.GetConsumingEnumerable())
            {
                fs.Write(data, 0, data.Length);
            }
            fs.Close();
        });

        Task readTask = Task.Factory.StartNew(() =>
        {
            var fs = File.OpenRead(@"C:\Temp\pass.txt");
            var bufferSize = 4096;
            var buffer = new byte[bufferSize];
            int bytesRead = 0;
            while ((bytesRead = fs.Read(buffer, 0, buffer.Length)) != 0)
            {
                byte[] dataToWrite = buffer;
                if (bytesRead < bufferSize)
                {
                    dataToWrite = new byte[bytesRead];
                    Array.Copy(buffer, dataToWrite, bytesRead);
                }
                bc.Add(dataToWrite);
            }
            fs.Close();
        }).ContinueWith(ant => bc.CompleteAdding());

        consumeTask.Wait();

    }
هل كانت مفيدة؟

المحلول

I believe it is because you are re-cycling the buffer when bytesRead == bufferSize. Imagine these two sequences (I use the term "pointer" loosely here to refer to a reference variable, I think it gets the point across better).

First, when you are below buffer size:

  1. you make buffer point at a new byte array in memory 4096 in size.
  2. fs.Read writes 20 bytes to the object buffer points at.
  3. you make dataToWrite point at the same object as buffer.
  4. you make dataToWrite point at a new byte array 20 bytes in size.
  5. you copy 20 bytes from the object buffer points at to the object dataToWrite points at.
  6. you put a pointer to the object dataToWrite points at in the blocking collection.
  7. fs.Read writes 30 bytes to the object buffer points at.

Now compare that to what happens if you meet the buffer size.

  1. you make buffer point at a new byte array in memory 4096 in size.
  2. fs.Read writes 4096 bytes to the object buffer points at.
  3. you make dataToWrite point at the same object as buffer.
  4. you put a pointer to the object dataToWrite points at in the blocking collection.
  5. fs.Read writes 30 bytes to the object buffer points at.

Because dataToWrite, buffer, and the item you added to the blocking collection all point at the same object the last fs.Read will modify the byte array that was just stored in the collection.

Drop your if statement and always allocate a new dataToWrite and your program should work fine.

نصائح أخرى

I would try Take() (rather than .GetConsumingEnumerable())
GetConsumingEnumerable should still work
And even if it is the default I would use a ConcurrentConcurrentQueue

See the documentation

BlockingCollection.Take Method

The order in which an item is removed depends on the type of collection used to create the BlockingCollection instance. When you create a BlockingCollection object, you can specify the type of collection to use. For example, you could specify a ConcurrentConcurrentQueue object for first in, first out (FIFO) behavior, or a ConcurrentStack object for last in, first out (LIFO) behavior.

As for duplicates. I have used it a lot and never known a BlockingCollection produce duplicates. I use it to feed a database where a duplicate would violate a constraint.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top