admin管理员组文章数量:1391918
I am trying to create a Scalar Python UDF (.html ) to parse a VARBYTE column in Redshift.
Sample code:
CREATE OR REPLACE FUNCTION decode_kafka_event_protobuf(proto_data
varchar(MAX)) RETURNS VARCHAR(MAX) VOLATILE as $$
import base64
from com.example.events import kafka_event_pb2
binary_str = base64.b64decode(proto_data)
proto_obj = kafka_event_pb2.ParseFromString(bytes(binary_str))
# some code to convert proto_obj to JSON string and return it
return json;
$$ LANGUAGE plpythonu;
As Redshift doesn't support VARBYTE
arguments inside Python UDF, I am converting the VARBYTE
column (kafka_value
) to base64 encoded string using FROM_VARBYTE
function as in below sample query:
SELECT FROM_VARBYTE(kafka_value, 'base64')) FROM my_table;
The question here is what's the difference between:
SELECT FROM_VARBYTE(kafka_value, 'base64') from my_table;
-- returns the base64 encoded string. BASE64_STR = abcd....
SELECT decode_kafka_event_protobuf('abcd....') from my_table;
-- works and returns the JSON
VS
SELECT decode_kafka_event_protobuf(FROM_VARBYTE(kafka_value, 'base64')) from my_table;
-- fails with the Decode error
Also, If I copy the BASE64_STR string inside UDF function (hardcode it's value), the parser works just fine.
I tried comparing the hardcoded BASE64_STR
and proto_data
with all kind of comparisons (including checking types, comparing string character by character, converting them to bytes and compare, converting them to hex strings and compare) but they appear identical.
So is there any non-documented details about the VARCHAR
argument which can help me here, that would be really great. Thank you in advance!
EDIT 1
Here is the error message:
this is the full error from svl_udf_log:
line 130, in decode_kafka_event_protobuf\n
File "0.py", line 116, in decode_protobuf\n
File "message.py", line 199, in ParseFromString\n
return self.MergeFromString(serialized)\n
File "python_message.py", line 1134, in MergeFromString\n
if self._InternalParse(serialized, 0, length) != length:\n
File "python_message.py", line 1201, in InternalParse\n
pos = field_decoder(buffer, new_pos, end, self, field_dict)\n
File "decoder.py", line 738, in DecodeField\n
if value._InternalParse(buffer, pos, new_pos) != new_pos:\n
File "python_message.py", line 1188, in InternalParse\n
buffer, new_pos, wire_type) # pylint: disable=protected-access\n
File "decoder.py", line 977, in _DecodeUnknownField\n
raise _DecodeError('Wrong wire type in tag.')\n
Also, I tried using the Hexadecimal string format instead of base64 like this (also changed the UDF code to use binascii.unhexlify()
to convert the Hex to binary):
SELECT decode_kafka_event_protobuf(FROM_VARBYTE(kafka_value, 'hex')) from my_table;
but still got the same error.
本文标签: AWS Redshift VARCHAR argument inside Scalar Python UDF inconsistencyStack Overflow
版权声明:本文标题:AWS Redshift VARCHAR argument inside Scalar Python UDF inconsistency - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744731386a2622069.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论