admin管理员组

文章数量:1122846

I am trying to upload a million records from an Oracle database to SQL Server data warehouse. I've used similar code previously to upload/sync data from SQL Server and a Synergex db via ODBC with great success. However, with Oracle I've hit a roadblock. I keep getting the following error.

Microsoft.Data.SqlClient.SqlException: 'The incoming tabular data stream (TDS) remote procedure call (RPC) protocol stream is incorrect. Table-valued parameter 1 (""), row 3, column 2: Data type 0x2A has an invalid data length or metadata length.
The data for table-valued parameter "@UserHistory" doesn't conform to the table type of the parameter. SQL Server error is: 8037, state: 83 The statement has been terminated.'

The code is

public static async Task StreamUserHistoryToDWAsync(IConfigurationRoot config)
{
    try
    {
        var watch = System.Diagnostics.Stopwatch.StartNew();

        string query = @"WBH_DATAWAREHOUSE.WBH_UserHistory";

        OracleConnection OraCn = new OracleConnection(config.GetConnectionString("SynapseProd"));
        await OraCn.OpenAsync();

        OracleCommand cmd = new OracleCommand(query, OraCn);
        cmd.CommandType = CommandType.StoredProcedure;

        cmd.Parameters.Add(new OracleParameter("MaxBegtime", GetMaxUserHistory(config)));
        cmd.Parameters.Add(new OracleParameter("rcUserHistory", OracleDbType.RefCursor, ParameterDirection.Output));

        OracleDataReader odr =  await cmd.ExecuteReaderAsync(CommandBehavior.CloseConnection);

        string insertQuery = "[Synapse].[InsertUserHistory]";

        SqlConnection sqlCn = new SqlConnection(config.GetConnectionString("powerbi"));
        SqlCommand sqlCmd = new SqlCommand(insertQuery, sqlCn);
        sqlCmd.CommandType = CommandType.StoredProcedure;

        await sqlCn.OpenAsync();

        SqlParameter tvp = new SqlParameter("@UserHistory", odr);
        tvp.SqlDbType = SqlDbType.Structured;

        SqlParameter rtn = new SqlParameter("@rtn_result", SqlDbType.Int);
        rtn.Direction = ParameterDirection.Output;

        sqlCmd.Parameters.Add(tvp);
        sqlCmd.Parameters.Add(rtn);

        await sqlCmd.ExecuteNonQueryAsync();
    
        sqlCmd.Connection.Close();

        watch.Stop();
        var elapsed = watch.ElapsedMilliseconds;

        Console.WriteLine(elapsed.ToString());

        logger.Info("User History elapsed time: {time}", elapsed.ToString());
    }
    catch (Exception ex)
    {
        logger.Error(ex, "StreamUserHistoryToDW Error");
    }
}

The parameter @UserHistory is a table value parameter. It matches the datatypes exactly for what's coming from Oracle. The stored procedure simply inserts the data. not sure if needed for troubleshoot, because I commented out the stored procedure to do "nothing" but return a value. The issue seems to be with the table value parameter, not the procedure.

CREATE TYPE [Synapse].[UserHistoryTableType] AS TABLE
(
    [NAMEID] [varchar](12) NOT NULL,
    [BEGTIME] [datetime] NOT NULL,
    [EVENT] [varchar](4) NOT NULL,
    [ENDTIME] [datetime] NULL,
    [FACILITY] [varchar](3) NULL,
    [CUSTID] [varchar](10) NULL,
    [EQUIPMENT] [varchar](2) NULL,
    [UNITS] [int] NULL,
    [ETC] [nvarchar](255) NULL,
    [ORDERID] [int] NULL,
    [SHIPID] [smallint] NULL,
    [LOCATION] [varchar](10) NULL,
    [LPID] [varchar](20) NULL,
    [ITEM] [varchar](50) NULL,
    [UOM] [varchar](4) NULL,
    [BASEUOM] [varchar](4) NULL,
    [BASEUNITS] [int] NULL,
    [CUBE] [decimal](10, 4) NULL,
    [WEIGHT] [decimal](17, 8) NULL,
    [EMPLOYEECOST] [decimal](10, 2) NULL,
    [EQUIPMENTCOST] [decimal](10, 2) NULL
)

So, the interesting thing is if I limit the query results to two rows or less, the stored procedure saves the data in SQL. If I try to do three rows, I get the error. Doesn't matter what data I use, so it's not something "wrong" with the data.

I am using Oracle.ManagedAccess.Core 23.6.1 with .NET 8.0.

Do I need to find a different way to upload the data to SQL Server? Anything you'd recommend?

Thanks

I am trying to upload a million records from an Oracle database to SQL Server data warehouse. I've used similar code previously to upload/sync data from SQL Server and a Synergex db via ODBC with great success. However, with Oracle I've hit a roadblock. I keep getting the following error.

Microsoft.Data.SqlClient.SqlException: 'The incoming tabular data stream (TDS) remote procedure call (RPC) protocol stream is incorrect. Table-valued parameter 1 (""), row 3, column 2: Data type 0x2A has an invalid data length or metadata length.
The data for table-valued parameter "@UserHistory" doesn't conform to the table type of the parameter. SQL Server error is: 8037, state: 83 The statement has been terminated.'

The code is

public static async Task StreamUserHistoryToDWAsync(IConfigurationRoot config)
{
    try
    {
        var watch = System.Diagnostics.Stopwatch.StartNew();

        string query = @"WBH_DATAWAREHOUSE.WBH_UserHistory";

        OracleConnection OraCn = new OracleConnection(config.GetConnectionString("SynapseProd"));
        await OraCn.OpenAsync();

        OracleCommand cmd = new OracleCommand(query, OraCn);
        cmd.CommandType = CommandType.StoredProcedure;

        cmd.Parameters.Add(new OracleParameter("MaxBegtime", GetMaxUserHistory(config)));
        cmd.Parameters.Add(new OracleParameter("rcUserHistory", OracleDbType.RefCursor, ParameterDirection.Output));

        OracleDataReader odr =  await cmd.ExecuteReaderAsync(CommandBehavior.CloseConnection);

        string insertQuery = "[Synapse].[InsertUserHistory]";

        SqlConnection sqlCn = new SqlConnection(config.GetConnectionString("powerbi"));
        SqlCommand sqlCmd = new SqlCommand(insertQuery, sqlCn);
        sqlCmd.CommandType = CommandType.StoredProcedure;

        await sqlCn.OpenAsync();

        SqlParameter tvp = new SqlParameter("@UserHistory", odr);
        tvp.SqlDbType = SqlDbType.Structured;

        SqlParameter rtn = new SqlParameter("@rtn_result", SqlDbType.Int);
        rtn.Direction = ParameterDirection.Output;

        sqlCmd.Parameters.Add(tvp);
        sqlCmd.Parameters.Add(rtn);

        await sqlCmd.ExecuteNonQueryAsync();
    
        sqlCmd.Connection.Close();

        watch.Stop();
        var elapsed = watch.ElapsedMilliseconds;

        Console.WriteLine(elapsed.ToString());

        logger.Info("User History elapsed time: {time}", elapsed.ToString());
    }
    catch (Exception ex)
    {
        logger.Error(ex, "StreamUserHistoryToDW Error");
    }
}

The parameter @UserHistory is a table value parameter. It matches the datatypes exactly for what's coming from Oracle. The stored procedure simply inserts the data. not sure if needed for troubleshoot, because I commented out the stored procedure to do "nothing" but return a value. The issue seems to be with the table value parameter, not the procedure.

CREATE TYPE [Synapse].[UserHistoryTableType] AS TABLE
(
    [NAMEID] [varchar](12) NOT NULL,
    [BEGTIME] [datetime] NOT NULL,
    [EVENT] [varchar](4) NOT NULL,
    [ENDTIME] [datetime] NULL,
    [FACILITY] [varchar](3) NULL,
    [CUSTID] [varchar](10) NULL,
    [EQUIPMENT] [varchar](2) NULL,
    [UNITS] [int] NULL,
    [ETC] [nvarchar](255) NULL,
    [ORDERID] [int] NULL,
    [SHIPID] [smallint] NULL,
    [LOCATION] [varchar](10) NULL,
    [LPID] [varchar](20) NULL,
    [ITEM] [varchar](50) NULL,
    [UOM] [varchar](4) NULL,
    [BASEUOM] [varchar](4) NULL,
    [BASEUNITS] [int] NULL,
    [CUBE] [decimal](10, 4) NULL,
    [WEIGHT] [decimal](17, 8) NULL,
    [EMPLOYEECOST] [decimal](10, 2) NULL,
    [EQUIPMENTCOST] [decimal](10, 2) NULL
)

So, the interesting thing is if I limit the query results to two rows or less, the stored procedure saves the data in SQL. If I try to do three rows, I get the error. Doesn't matter what data I use, so it's not something "wrong" with the data.

I am using Oracle.ManagedAccess.Core 23.6.1 with .NET 8.0.

Do I need to find a different way to upload the data to SQL Server? Anything you'd recommend?

Thanks

Share Improve this question edited Nov 22, 2024 at 0:52 MT0 168k11 gold badges66 silver badges127 bronze badges asked Nov 21, 2024 at 18:01 David GerstDavid Gerst 7314 bronze badges 4
  • Try datetime2 instead of datetime perhaps? – siggemannen Commented Nov 21, 2024 at 18:11
  • I actually did try that already. same exact issue. I don't have the page handy, but one of the google searches I did someone said this error is not on the column specified in the error, but -3 from there. wish I had saved it. but if that's the case, it would be end of line or the previous row? – David Gerst Commented Nov 21, 2024 at 18:35
  • I got it to work. I defined the SQLMetadata per the answer to this question. stackoverflow.com/questions/55804423/… – David Gerst Commented Nov 21, 2024 at 23:08
  • 1 This is good. But if you used SqlBulkCopy instead of a TVP, this would all be simpler. You wouldn't need to declare a table type, and you could pass the DataReader directly to SqlBulkCopy. learn.microsoft.com/en-us/dotnet/api/… You wouldn't even have to buffer the Oracle results in your program if you don't want to. – David Browne - Microsoft Commented Nov 22, 2024 at 15:25
Add a comment  | 

1 Answer 1

Reset to default 0

I got my code to work. My code now looks like this. This article got me pointed in the right direction. How do I specify to use server defined values in a Table Valued Parameter when my source is a DataReader?

    public static async Task StreamUserHistoryToDWAsync3(IConfigurationRoot config)
{
    try
    {
        var watch = System.Diagnostics.Stopwatch.StartNew();

        string query = @"WBH_DATAWAREHOUSE.WBH_UserHistory";
        string insertQuery = "[Synapse].[InsertUserHistory]";

        using (SqlConnection sqlCn = new SqlConnection(config.GetConnectionString("powerbi")))
        using (OracleConnection oraCn = new OracleConnection(config.GetConnectionString("SynapseProd")))
        {
            await sqlCn.OpenAsync();
            await oraCn.OpenAsync();

            using (OracleCommand oraCmd = new OracleCommand(query, oraCn))
            using (SqlCommand sqlCmd = new SqlCommand(insertQuery, sqlCn))
            {
                oraCmd.CommandType = CommandType.StoredProcedure;
                oraCmd.Parameters.Add(new OracleParameter("MaxBegtime", GetMaxUserHistory(config)));
                oraCmd.Parameters.Add(new OracleParameter("rcUserHistory", OracleDbType.RefCursor, ParameterDirection.Output));
                
                sqlCmd.CommandType = CommandType.StoredProcedure;

                using (OracleDataReader odr = oraCmd.ExecuteReader())
                {
                    SqlParameter tvp = new SqlParameter("@UserHistory", SendRowsToProc(odr));
                    tvp.SqlDbType = SqlDbType.Structured;
                    tvp.TypeName = "[Synapse].[UserHistoryTableType]";
                    SqlParameter rtn = new SqlParameter("@rtn_result", SqlDbType.Int);
                    rtn.Direction = ParameterDirection.Output;
                    sqlCmd.Parameters.Add(tvp);
                    sqlCmd.Parameters.Add(rtn);
                    await sqlCmd.ExecuteNonQueryAsync();
                }
            }
        }


        watch.Stop();
        var elapsed = watch.ElapsedMilliseconds;

        Console.WriteLine(elapsed.ToString());

        logger.Info("User History elapsed time: {time}", elapsed.ToString());

    }
    catch (Exception ex)
    {
        logger.Error(ex, "StreamUserHistoryToDW Error");
    }
}


private static IEnumerable<SqlDataRecord> SendRowsToProc(OracleDataReader reader)
{
    if (!reader.HasRows)
    {
        yield break;
    }

    SqlDataRecord resultRow = new SqlDataRecord(new SqlMetaData[] {
        new SqlMetaData("NAMEID", SqlDbType.VarChar,12),
        new SqlMetaData("BEGTIME", SqlDbType.DateTime),
        new SqlMetaData("EVENT", SqlDbType.VarChar,4),
        new SqlMetaData("ENDTIME", SqlDbType.DateTime),
        new SqlMetaData("FACILITY", SqlDbType.VarChar,3),
        new SqlMetaData("CUSTID", SqlDbType.VarChar,10),
        new SqlMetaData("EQUIPMENT", SqlDbType.VarChar,2),
        new SqlMetaData("UNITS", SqlDbType.Int),
        new SqlMetaData("ETC", SqlDbType.VarChar,255),
        new SqlMetaData("ORDERID", SqlDbType.Int),
        new SqlMetaData("SHIPID", SqlDbType.SmallInt),
        new SqlMetaData("LOCATION", SqlDbType.VarChar,10),
        new SqlMetaData("LPID", SqlDbType.VarChar,20),
        new SqlMetaData("ITEM", SqlDbType.VarChar, 50),
        new SqlMetaData("UOM", SqlDbType.VarChar, 4),
        new SqlMetaData("BASEUOM", SqlDbType.VarChar,4),
        new SqlMetaData("BASEUNITS", SqlDbType.Int),
        new SqlMetaData("CUBE", SqlDbType.Float),
        new SqlMetaData("WEIGHT", SqlDbType.Decimal, 17,8),
        new SqlMetaData("EMPLOYEECOST", SqlDbType.Decimal,10,2),
        new SqlMetaData("EQUIPMENTCOST", SqlDbType.Decimal, 10, 2)
    });

    while (reader.Read())
    {
        try
        {
            resultRow.SetString(0, ConvertFromDBVal<string>(reader.GetValue("NAMEID").ToString()));
            resultRow.SetDateTime(1, ConvertFromDBVal<DateTime>(reader.GetValue("BEGTIME")));
            resultRow.SetString(2, ConvertFromDBVal<string>(reader.GetValue("EVENT").ToString()));
            resultRow.SetDateTime(3, ConvertFromDBVal<DateTime>(reader.GetValue("ENDTIME")));
            resultRow.SetString(4, ConvertFromDBVal<string>(reader.GetValue("FACILITY").ToString()));
            resultRow.SetString(5, ConvertFromDBVal<string>(reader.GetValue("CUSTID").ToString()));
            resultRow.SetString(6, ConvertFromDBVal<string>(reader.GetValue("EQUIPMENT").ToString()));
            resultRow.SetInt32(7, ConvertFromDBVal<Int32>(reader.GetValue("UNITS")));
            resultRow.SetString(8, ConvertFromDBVal<string>(reader.GetValue("ETC").ToString()));
            resultRow.SetSqlInt32(9, ConvertFromDBVal<Int32>(reader.GetValue("ORDERID")));
            resultRow.SetSqlInt16(10, ConvertFromDBVal<Int16>(reader.GetValue("SHIPID")));
            resultRow.SetString(11, ConvertFromDBVal<string>(reader.GetValue("LOCATION").ToString()));
            resultRow.SetString(12, ConvertFromDBVal<string>(reader.GetValue("LPID").ToString()));
            resultRow.SetString(13, ConvertFromDBVal<string>(reader.GetValue("ITEM").ToString()));
            resultRow.SetString(14, ConvertFromDBVal<string>(reader.GetValue("UOM").ToString()));
            resultRow.SetString(15, ConvertFromDBVal<string>(reader.GetValue("BASEUOM").ToString()));
            resultRow.SetInt32(16, ConvertFromDBVal<Int32>(reader.GetValue("BASEUNITS")));
            resultRow.SetDouble(17, ConvertFromDBVal<double>(reader.GetValue("CUBE")));
            resultRow.SetSqlDecimal(18, ConvertFromDBVal<decimal>(reader.GetValue("WEIGHT")));
            resultRow.SetSqlDecimal(19, ConvertFromDBVal<decimal>(reader.GetValue("EMPLOYEECOST")));
            resultRow.SetSqlDecimal(20, ConvertFromDBVal<decimal>(reader.GetValue("EMPLOYEECOST")));
        }
        catch(Exception ex)
        {
            logger.Error(ex, "Setting up resultrow Error");
        }
        
        yield return resultRow;
    }
}

public static T ConvertFromDBVal<T>(object obj)
{
    if (obj == null || obj == DBNull.Value)
    {
        return default(T); // returns the default value for the type
    }
    else
    {
        return (T)obj;
    }
}

本文标签: sql serverData type 0x2A has an invalid data length or metadata lengthStack Overflow