admin管理员组

文章数量:1122846

So I'm sending a variable to my dataflow's parameter called LastRunTimestamp as a string. The timestamp is in this format 2024-11-21T00:00:00.0000000Z

My query is something like "SELECT * FROM c WHERE c.Discriminator = 'D1' AND c.InsertDateTime > '$LastRunTimestamp'"

The query doesn't error out, but it's definitely disregarding the parameter. If I hard code or test this in data explorer, it works. But I just cannot get this to work as a parameter.

I've tried quite a few different ways of doing this including concat('SELECT * FROM c WHERE c.InsertDateTime > ','D1',' AND c.InsertDateTime > ',$LastRunTimestamp)

but every time I get the query to work, it seems to always disregard the parameter. I've also tried concatenating the string without using the concat function etc. I don't know what else I can try.

JSON Definition of data flow

{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "ContainerTesting",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "TestTable",
                        "type": "DatasetReference"
                    },
                    "name": "sink2"
                },
                {
                    "dataset": {
                        "referenceName": "Json1",
                        "type": "DatasetReference"
                    },
                    "name": "sink3"
                }
            ],
            "transformations": [
                {
                    "name": "derivedColumn1"
                },
                {
                    "name": "derivedColumn2"
                }
            ],
            "scriptLines": [
                "parameters{",
                "     LastRunTimestamp as string,",
                "}",
                "source(output(",
                "          Id as integer,",
                "          UpdateDateTime as string,",
                "          PartitionKey as integer,",
                "          Discriminator as string,",
                "          InsertDateTime as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     limit: 100,",
                "     query: ("SELECT c.Id, c.Discriminator, c.PartitionKey, c.InsertDateTime, c.UpdateDateTime FROM c where c.Discriminator = 'D1' AND c.InsertDateTime > '$LastRunTimestamp'"),",
                "     format: 'documentQuery',",
                "     systemColumns: false) ~> source1",
                "source1 derive(Blob_Location = concat('REDACTED', toString(Id), '.json'),",
                "          Load_Time = toString(currentUTC(),'yyyy-MM-dd HH:mm:ss'),",
                "          LastTimestamp = $LastRunTimestamp) ~> derivedColumn1",
                "source1 derive(IdAsString = toString(Id)) ~> derivedColumn2",
                "derivedColumn1 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     deletable:false,",
                "     insertable:true,",
                "     updateable:false,",
                "     upsertable:false,",
                "     format: 'table',",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     errorHandlingOption: 'stopOnFirstError') ~> sink2",
                "derivedColumn2 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     rowUrlColumn:'IdAsString',",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     mapColumn(",
                "          Id,",
                "          UpdateDateTime,",
                "          PartitionKey,",
                "          Discriminator,",
                "          InsertDateTime,",
                "          IdAsString",
                "     )) ~> sink3"
            ]
        }
    }
}

So I'm sending a variable to my dataflow's parameter called LastRunTimestamp as a string. The timestamp is in this format 2024-11-21T00:00:00.0000000Z

My query is something like "SELECT * FROM c WHERE c.Discriminator = 'D1' AND c.InsertDateTime > '$LastRunTimestamp'"

The query doesn't error out, but it's definitely disregarding the parameter. If I hard code or test this in data explorer, it works. But I just cannot get this to work as a parameter.

I've tried quite a few different ways of doing this including concat('SELECT * FROM c WHERE c.InsertDateTime > ','D1',' AND c.InsertDateTime > ',$LastRunTimestamp)

but every time I get the query to work, it seems to always disregard the parameter. I've also tried concatenating the string without using the concat function etc. I don't know what else I can try.

JSON Definition of data flow

{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "ContainerTesting",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "TestTable",
                        "type": "DatasetReference"
                    },
                    "name": "sink2"
                },
                {
                    "dataset": {
                        "referenceName": "Json1",
                        "type": "DatasetReference"
                    },
                    "name": "sink3"
                }
            ],
            "transformations": [
                {
                    "name": "derivedColumn1"
                },
                {
                    "name": "derivedColumn2"
                }
            ],
            "scriptLines": [
                "parameters{",
                "     LastRunTimestamp as string,",
                "}",
                "source(output(",
                "          Id as integer,",
                "          UpdateDateTime as string,",
                "          PartitionKey as integer,",
                "          Discriminator as string,",
                "          InsertDateTime as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     limit: 100,",
                "     query: ("SELECT c.Id, c.Discriminator, c.PartitionKey, c.InsertDateTime, c.UpdateDateTime FROM c where c.Discriminator = 'D1' AND c.InsertDateTime > '$LastRunTimestamp'"),",
                "     format: 'documentQuery',",
                "     systemColumns: false) ~> source1",
                "source1 derive(Blob_Location = concat('REDACTED', toString(Id), '.json'),",
                "          Load_Time = toString(currentUTC(),'yyyy-MM-dd HH:mm:ss'),",
                "          LastTimestamp = $LastRunTimestamp) ~> derivedColumn1",
                "source1 derive(IdAsString = toString(Id)) ~> derivedColumn2",
                "derivedColumn1 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     deletable:false,",
                "     insertable:true,",
                "     updateable:false,",
                "     upsertable:false,",
                "     format: 'table',",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     errorHandlingOption: 'stopOnFirstError') ~> sink2",
                "derivedColumn2 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     rowUrlColumn:'IdAsString',",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     mapColumn(",
                "          Id,",
                "          UpdateDateTime,",
                "          PartitionKey,",
                "          Discriminator,",
                "          InsertDateTime,",
                "          IdAsString",
                "     )) ~> sink3"
            ]
        }
    }
}
Share Improve this question edited Nov 21, 2024 at 23:36 BlakeB9 asked Nov 21, 2024 at 21:03 BlakeB9BlakeB9 6171 gold badge8 silver badges22 bronze badges 2
  • Can you add the JSON definition of your dataflow? (With whatever redactions you need?) – Martin Smith Commented Nov 21, 2024 at 21:13
  • @MartinSmith Updated – BlakeB9 Commented Nov 21, 2024 at 23:36
Add a comment  | 

1 Answer 1

Reset to default 1

To add parameter in the query with dynamic expression for Cosmos db you need to pass the sting parameter enclosed in Quotes.

For this you need to pass the query like below:

concat("SELECT * FROM c WHERE c.Discriminator = 'D1' AND c.InsertDateTime > '" ,$LastRunTimestamp,"'")

so, it will work as below, and dataflow will take parameter value correctly in query:

SELECT * FROM c WHERE c.Discriminator = 'D1' AND c.InsertDateTime > ' $LastRunTimestamp_value'

本文标签: