Question

The setup

I have a pipeline processing many similar Tasks. It has three segments in order, A, then B, then C (each of which may have multiple stages). A and C are local to the machine, B is remote.

Three stage pipeline, middle stage B is remote

Tasks in section B, the remote section, need a connection to work. They should all share one connection.

The question

I want to create the connection when the first task moves from A to B, and close it when the last task moves from B to C. How do I do this?

The constraints

If it makes the solution easier I can accept one or more of the following constraints:

  • Segment B has only one stage.
  • Only one task in segment B at a time.
  • Queues (buffers) or any other block inserted between A and B or between B and C.
Était-ce utile?

La solution

This is how I would do it:

  • Have a connection variable somewhere initially set to null.
  • Whenever an item is processed, connection is created if needed and then used.
  • After the block is completed (which you can find out using the Completed property, assuming you propagate completion properly), Dispose() the connection, if it has been created.

In code, as a helper method, it could look something like this:

public static TransformBlock<TInput, TOutput>
    CreateTransformBlockWithConnection<TInput, TOutput, TConnection>(
    Func<TInput, TConnection, TOutput> transform,
    Func<TConnection> connectionFactory)
    where TConnection : class, IDisposable
{
    TConnection connection = null;

    var block = new TransformBlock<TInput, TOutput>(
        input =>
        {
            if (connection == null)
                connection = connectionFactory();

            return transform(input, connection);
        });

    block.Completion.ContinueWith(
        _ =>
        {
            if (connection != null)
                connection.Dispose();
        });

    return block;
}

(Note that this won't behave correctly if connectionFactory ever returns null. If you're worried about that, you can add a check for it.)

Usage example (this executes each string input as a SQL command and returns the result):

var block = CreateTransformBlockWithConnection(
    (string input, SqlConnection connection) =>
        new SqlCommand(input, connection).ExecuteScalar(),
    () => new SqlConnection());
Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top