Question

My goal is to retrieve data from a data source, add some metadata to it and insert it to another target.

The target has schema with four more columns then the source (calculated columns).

I am using SqlBulkCopy, which requires a reader with all columns (including the 4 calculated).

Is there a way to add columns to the DataReader manually? or if it's not possible what alternatives i have for the data insertion?

Was it helpful?

Solution 3

I needed to do this, and also have the ability to make columns based on other columns and have the column's value depend on the row index of the reader. This class implements IWrappedReader from Dapper if thats useful to you (it was for me). The class isn't complete as I didn't implement all the IDataRecord fields but you can look at how I did IDataRecord.GetInt32 to see the simple pattern.

/// <inheritdoc />
public class WrappedDataReader : IDataReader
{
    private readonly IList<AdditionalField> _additionalFields;
    private readonly int _originalOrdinalCount;
    private IDbCommand _cmd;
    private int _currentRowIndex = -1; //The first Read() will make this 0
    private IDataReader _underlyingReader;

    public WrappedDataReader(IDataReader underlyingReader, IList<AdditionalField> additionalFields)
    {
        _additionalFields = additionalFields;
        _underlyingReader = underlyingReader;

        var schema = Reader.GetSchemaTable();
        if (schema == null)
        {
            throw new ObjectDisposedException(GetType().Name);
        }

        _originalOrdinalCount = schema.Rows.Count;
    }

    public object this[int i]
    {
        get { throw new NotImplementedException(); }
    }

    public IDataReader Reader
    {
        get
        {
            if (_underlyingReader == null)
            {
                throw new ObjectDisposedException(GetType().Name);
            }

            return _underlyingReader;
        }
    }

    IDbCommand IWrappedDataReader.Command
    {
        get
        {
            if (_cmd == null)
            {
                throw new ObjectDisposedException(GetType().Name);
            }

            return _cmd;
        }
    }

    void IDataReader.Close() => _underlyingReader?.Close();

    int IDataReader.Depth => Reader.Depth;

    DataTable IDataReader.GetSchemaTable()
    {
        var rv = Reader.GetSchemaTable();
        if (rv == null)
        {
            throw new ObjectDisposedException(GetType().Name);
        }

        for (var i = 0; i < _additionalFields.Count; i++)
        {
            var row = rv.NewRow();

            row["ColumnName"] = _additionalFields[i].ColumnName;
            row["ColumnOrdinal"] = GetAppendColumnOrdinal(i);
            row["DataType"] = _additionalFields[i].DataType;

            rv.Rows.Add(row);
        }

        return rv;
    }

    bool IDataReader.IsClosed => _underlyingReader?.IsClosed ?? true;

    bool IDataReader.NextResult() => Reader.NextResult();

    bool IDataReader.Read()
    {
        _currentRowIndex++;
        return Reader.Read();
    }

    int IDataReader.RecordsAffected => Reader.RecordsAffected;

    void IDisposable.Dispose()
    {
        _underlyingReader?.Close();
        _underlyingReader?.Dispose();
        _underlyingReader = null;
        _cmd?.Dispose();
        _cmd = null;
    }

    int IDataRecord.FieldCount => Reader.FieldCount + _additionalFields.Count;

    bool IDataRecord.GetBoolean(int i) => Reader.GetBoolean(i);

    byte IDataRecord.GetByte(int i) => Reader.GetByte(i);

    long IDataRecord.GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) =>
        Reader.GetBytes(i, fieldOffset, buffer, bufferoffset, length);

    char IDataRecord.GetChar(int i) => Reader.GetChar(i);

    long IDataRecord.GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) =>
        Reader.GetChars(i, fieldoffset, buffer, bufferoffset, length);

    IDataReader IDataRecord.GetData(int i) => Reader.GetData(i);

    string IDataRecord.GetDataTypeName(int i) => Reader.GetDataTypeName(i);

    DateTime IDataRecord.GetDateTime(int i) => Reader.GetDateTime(i);

    decimal IDataRecord.GetDecimal(int i) => Reader.GetDecimal(i);

    double IDataRecord.GetDouble(int i) => Reader.GetDouble(i);

    Type IDataRecord.GetFieldType(int i) => Reader.GetFieldType(i);

    float IDataRecord.GetFloat(int i) => Reader.GetFloat(i);

    Guid IDataRecord.GetGuid(int i) => Reader.GetGuid(i);

    short IDataRecord.GetInt16(int i) => Reader.GetInt16(i);

    int IDataRecord.GetInt32(int i)
    {
        return i >= _originalOrdinalCount ? (int) ExecuteAdditionalFieldFunc(i) : Reader.GetInt32(i);
    }

    long IDataRecord.GetInt64(int i) => Reader.GetInt64(i);

    string IDataRecord.GetName(int i)
    {
        return i >= _originalOrdinalCount ? _additionalFields[GetAppendColumnIndex(i)].ColumnName : Reader.GetName(i);
    }

    int IDataRecord.GetOrdinal(string name)
    {
        for (var i = 0; i < _additionalFields.Count; i++)
        {
            if (name.Equals(_additionalFields[i].ColumnName, StringComparison.OrdinalIgnoreCase))
            {
                return GetAppendColumnOrdinal(i);
            }
        }

        return Reader.GetOrdinal(name);
    }

    string IDataRecord.GetString(int i) => Reader.GetString(i);

    object IDataRecord.GetValue(int i)
    {
        return i >= _originalOrdinalCount ? ExecuteAdditionalFieldFunc(i) : Reader.GetValue(i);
    }

    int IDataRecord.GetValues(object[] values) => Reader.GetValues(values);

    bool IDataRecord.IsDBNull(int i)
    {
        return i >= _originalOrdinalCount ? ExecuteAdditionalFieldFunc(i) == null : Reader.IsDBNull(i);
    }

    object IDataRecord.this[string name]
    {
        get
        {
            var ordinal = ((IDataRecord) this).GetOrdinal(name);
            return ((IDataRecord) this).GetValue(ordinal);
        }
    }


    object IDataRecord.this[int i] => ((IDataRecord) this).GetValue(i);

    private int GetAppendColumnOrdinal(int index)
    {
        return _originalOrdinalCount + index;
    }

    private int GetAppendColumnIndex(int oridinal)
    {
        return oridinal - _originalOrdinalCount;
    }

    private object ExecuteAdditionalFieldFunc(int oridinal)
    {
        return _additionalFields[GetAppendColumnIndex(oridinal)].Func(_currentRowIndex, Reader);
    }

    public struct AdditionalField
    {
        public AdditionalField(string columnName, Type dataType, Func<int, IDataReader, object> func = null)
        {
            ColumnName = columnName;
            DataType = dataType;
            Func = func;
        }

        public string ColumnName;
        public Type DataType;
        public Func<int, IDataReader, object> Func;
    }
}

Here is a quick NUnit test I wrote to show case its usage

[Test]
public void ReaderTest()
{
    using (var conn = new SqlConnection(ConnectionSettingsCollection.Default))
    {
        conn.Open();
        const string sql = @"
            SELECT 1 as OriginalField
            UNION
            SELECT -500 as OriginalField
            UNION
            SELECT 100 as OriginalField
        ";

        var additionalFields = new[]
        {
            new WrappedDataReader.AdditionalField("StaticField", typeof(int), delegate { return "X"; }),
            new WrappedDataReader.AdditionalField("CounterField", typeof(int), (i, reader) => i),
            new WrappedDataReader.AdditionalField("ComputedField", typeof(int), (i, reader) => (int) reader["OriginalField"] + 1000)
        };

        const string expectedJson = @"
            [
                {""OriginalField"":-500,""StaticField"":""X"",""CounterField"":0,""ComputedField"":500},
                {""OriginalField"":1,   ""StaticField"":""X"",""CounterField"":1,""ComputedField"":1001},
                {""OriginalField"":100, ""StaticField"":""X"",""CounterField"":2,""ComputedField"":1100}
            ]
        ";

        var actualJson = ToJson(new WrappedDataReader(new SqlCommand(sql, conn).ExecuteReader(), additionalFields));

        Assert.Zero(CultureInfo.InvariantCulture.CompareInfo.Compare(expectedJson, actualJson, CompareOptions.IgnoreSymbols));
    }
}

private static string ToJson(IDataReader reader)
{
    using (var strWriter = new StringWriter(new StringBuilder()))
    using (var jsonWriter = new JsonTextWriter(strWriter))
    {
        jsonWriter.WriteStartArray();

        while (reader.Read())
        {
            jsonWriter.WriteStartObject();

            for (var i = 0; i < reader.FieldCount; i++)
            {
                jsonWriter.WritePropertyName(reader.GetName(i));
                jsonWriter.WriteValue(reader[i]);
            }

            jsonWriter.WriteEndObject();
        }

        jsonWriter.WriteEndArray();

        return strWriter.ToString();
    }
}

OTHER TIPS

A DataReader is a read-only construct and so it can't be modified. You can use a DataTable instead.

It is possible

  • Create your own class that implements the IDataReader interface
  • Add an existing DataReader in your class constructor
  • override the interface as needed to return the result from your Base DataReader or return your own calculated values

Just to get an idea, this could be a simple implementation (I skipped most methods)

public class WrapperDataReader : IDataReader
{
    private IDataReader reader;

    public WrapperDataReader(IDataReader reader)
    {
        this.reader = reader;
    }

    public void Close()
    {
        reader.Close();
    }

    public int Depth
    {
        get { return reader.Depth; }
    }

    public DataTable GetSchemaTable()
    {
        var schemaTable = reader.GetSchemaTable();
        // add your computed column to the schema table
        schemaTable.Rows.Add(...);
        return schemaTable;
    }

    public bool GetBoolean(int i)
    {
        return reader.GetBoolean(i);
    }

    public int GetOrdinal(string name)
    {
        if (name.Equals("displayName", StringComparison.InvariantCultureIgnoreCase))
            return 15;
        return reader.GetOrdinal(name);
    }

    public string GetString(int i)
    {
        if (i == 15)
            return String.Format("{0}, {1}", GetString(1), GetString(2)); // lastname, firstname
        return reader.GetString(i);
    }

}

Update

Since you are propably using the WriteToServer method, you can use the overload that takes a DataTable instead.

        var connectionString = "...";
        var copy = new SqlBulkCopy(connectionString, SqlBulkCopyOptions.Default);
        copy.DestinationTableName = "Customers";

        var reader = new SqlDataReader();
        var table = new DataTable();
        table.Load(reader);
        table.Columns.Add("DisplayName", typeof(string), "lastname, firstname");
        table.Columns.Add("CustomerCode", typeof(string));

        foreach (DataRow row in table.Rows)
            row["CustomerCode"] = ((int)row["id"] + 10000).ToString();

        copy.WriteToServer(table);

Datareader is only for reading the data. You cannot modify its schema or values

Dataset/DataTable is meant for this purpose.

I've found another possible solution, that I'm currently using:

  1. Create the following objects from your existing DataReader:
    IEnumerable<object[]> - which represents the data
    List<Column> - which represents the columns for the data DataReader.

  2. Modify the data and add the the extra columns

  3. Create a method AsDataReader(IEnumerable<object[]> updatedData,updatedColumns List<Column>) which gets the updates objects and returns* modified DataReader

    • I used yield return to improve performance.
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top