admin管理员组文章数量:1415645
/** mode('streaming')*/
CREATE OR REPLACE TABLE eoj_table (
`pk` string,
`id` string,
`name` string,
`headers` MAP<STRING, BYTES> METADATA ,
`hard_deleted` boolean,
`kafka_key` STRING,
`ts` timestamp(3) METADATA FROM 'timestamp'VIRTUAL,
`procTime` AS PROCTIME(),
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'kafka:29092',
'properties.group.id' = 'group_id_1',
'topic-pattern' = '^topic(_backfill)?$',
'value.format' = 'json',
'format' = 'json',
'key.format' = 'raw',
'key.fields' = 'kafka_key',
'value.fields-include' = 'EXCEPT_KEY',
'scan.startup.mode' = 'earliest-offset',
'json.timestamp-format.standard' = 'ISO-8601',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
I have the above kafka table, and trying to run this query
SELECT LAG(pk)
OVER (
PARTITION BY id ORDER BY procTime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
)
AS prev_data_hash
FROM eoj_table;
I get this error
Caused by: .apache.calcite.sql.validate.SqlValidatorException: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions
How do I use LAG
function and have it look current row and previous row only.
/** mode('streaming')*/
CREATE OR REPLACE TABLE eoj_table (
`pk` string,
`id` string,
`name` string,
`headers` MAP<STRING, BYTES> METADATA ,
`hard_deleted` boolean,
`kafka_key` STRING,
`ts` timestamp(3) METADATA FROM 'timestamp'VIRTUAL,
`procTime` AS PROCTIME(),
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'kafka:29092',
'properties.group.id' = 'group_id_1',
'topic-pattern' = '^topic(_backfill)?$',
'value.format' = 'json',
'format' = 'json',
'key.format' = 'raw',
'key.fields' = 'kafka_key',
'value.fields-include' = 'EXCEPT_KEY',
'scan.startup.mode' = 'earliest-offset',
'json.timestamp-format.standard' = 'ISO-8601',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
I have the above kafka table, and trying to run this query
SELECT LAG(pk)
OVER (
PARTITION BY id ORDER BY procTime ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
)
AS prev_data_hash
FROM eoj_table;
I get this error
Caused by: .apache.calcite.sql.validate.SqlValidatorException: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions
How do I use LAG
function and have it look current row and previous row only.
1 Answer
Reset to default 0You can do it like this:
SELECT LAG(pk, 1)
OVER (
PARTITION BY id ORDER BY procTime
)
AS prev_data_hash
FROM eoj_table;
This query doesn't specify a number of ROWs or a RANGE, but that doesn't matter. It's the LAG
function that manages the state for this query, and its implementation only keeps as many previous values in its accumulator as necessary (in this case, it will keep 2).
If you're curious, here's the relevant code:
public void accumulate(LagAcc<T> acc, T value) throws Exception {
acc.buffer.add(value);
while (acc.buffer.size() > acc.offset + 1) {
acc.buffer.removeFirst();
}
}
In this case, the value of the offset
is 1, so it keeps a buffer of 2 values.
本文标签: Flink sql LAG function with windowframe does not workStack Overflow
版权声明:本文标题:Flink sql LAG function with window_frame does not work - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1745240438a2649295.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论