admin管理员组文章数量:1278919
I am quite new to Pyspark, though frequent user of pandas etc.
I have a spark dataframe consisting of columns: ID, Trajectory, type
Where ID is an integer, Trajectory is a list of dataframe-rows containing coordinate-information and type is a list of strings
The goal is to, for each ID, draw a polyline in a python folium-map, based on the coordinates in the Trajectory-column-list.
as reference:
df.first()
produces the following outcome
Row(ID=209223000, Trajectory=[Row(Timestamp=datetime.datetime(2024, 11, 1, 2, 58, 29), Latitude=55.108018, Longitude=18.306343, Destination='Unknown'), Row(Timestamp=datetime.datetime(2024, 11, 1, 6, 18, 18), Latitude=55.115625, Longitude=18.103083, Destination='Unknown')], types=['Undefined'])
So to be clear, I have no need for an extra column being added to the dataframe, and no objections if that is neede either.
So the Pseudo would look something lke this:
create map-object m
For each Row in df:
create polyline object ply
ply.coordinates = [item.Lat,item.Long] for item in Row['Trajectory']
ply.tooltip = Row['ID'] + Row[types]
add ply to m
The end goal is to showcase the tracks of objects depending on certain filters being applied in the dataframe
I've tried to create an UDF with the pseudocode described, but I cant wrap my head around how to apply the UDF, since a UDF is applied on a column rather than a dataframe.
The expected result would be add polylines to the map-object based on each row of the dataframe, e.g. each individual ID.
EDIT:
Schema:
|-- ID: integer (nullable = true)
|-- Trajectory: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- Timestamp: timestamp (nullable = true)
| | |-- Latitude: double (nullable = true)
| | |-- Longitude: double (nullable = true)
| | |-- Destination: string (nullable = true)
|-- types: array (nullable = false)
| |-- element: string (containsNull = false)
A snapshot (compressed) of the data:
+---------+--------------------+--------------------+
| ID | Trajectory| types|
+---------+--------------------+--------------------+
|209223000|[{2024-11-01 02:5...| [Undefined]|
|209508000|[{2024-11-01 08:2...| [Tanker, Undefined]|
|209864000|[{2024-11-01 14:4...|[Passenger, Undef...|
|210095000|[{2024-11-01 08:2...|[Passenger, Undef...|
|210350000|[{2024-11-01 00:0...| [Undefined]|
+---------+--------------------+--------------------+
EDIT no2:
spark = SparkSession.builder.appName("AIS_Data_Analysis").getOrCreate()
# exact file as used is available at "/"
datapath = "AIS_data"
filename="aisdk-2024-11-01.csv"
path = os.path.join(datapath, filename)
df_pyspark=spark.read.csv(path,header=True,inferSchema=True)
df_pyspark.printSchema()
df_pyspark.collect()
produces:
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\IPython\core\interactiveshell.py", line 3579, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "C:\Users\seolase1\AppData\Local\Temp\ipykernel_16900\381113642.py", line 1, in <module>
df_pyspark.collect()
File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\pyspark\sql\dataframe.py", line 1263, in collect
sock_info = self._jdf.collectToPython()
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
return_value = get_return_value(
^^^^^^^^^^^^^^^^^
File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\pyspark\errors\exceptions\captured.py", line 179, in deco
return f(*a, **kw)
^^^^^^^^^^^
File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\py4j\protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\py4j\clientserver.py", line 511, in send_command
answer = smart_decode(self.stream.readline()[:-1])
^^^^^^^^^^^^^^^^^^^^^^
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.11_3.11.2544.0_x64__qbz5n2kfra8p0\Lib\socket.py", line 706, in readinto
return self._sock.recv_into(b)
^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\py4j\clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
本文标签: pythonPySparkApplying User defined function (UDF) to entire row of dataframeStack Overflow
版权声明:本文标题:python - PySpark - Applying User defined function (UDF) to entire row of dataframe - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741246461a2364970.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论