

I am quite new to Pyspark, though frequent user of pandas etc.

I have a spark dataframe consisting of columns: ID, Trajectory, type

Where ID is an integer, Trajectory is a list of dataframe-rows containing coordinate-information and type is a list of strings

The goal is to, for each ID, draw a polyline in a python folium-map, based on the coordinates in the Trajectory-column-list.

as reference:

df.first() produces the following outcome

Row(ID=209223000, Trajectory=[Row(Timestamp=datetime.datetime(2024, 11, 1, 2, 58, 29), Latitude=55.108018, Longitude=18.306343, Destination='Unknown'), Row(Timestamp=datetime.datetime(2024, 11, 1, 6, 18, 18), Latitude=55.115625, Longitude=18.103083, Destination='Unknown')], types=['Undefined'])

So to be clear, I have no need for an extra column being added to the dataframe, and no objections if that is neede either.

So the Pseudo would look something lke this:

create map-object m

For each Row in df:
    create polyline object ply
    ply.coordinates = [item.Lat,item.Long] for item in Row['Trajectory']
    ply.tooltip = Row['ID'] + Row[types]
    add ply to m

The end goal is to showcase the tracks of objects depending on certain filters being applied in the dataframe

I've tried to create an UDF with the pseudocode described, but I cant wrap my head around how to apply the UDF, since a UDF is applied on a column rather than a dataframe.

The expected result would be add polylines to the map-object based on each row of the dataframe, e.g. each individual ID.



|-- ID: integer (nullable = true)
 |-- Trajectory: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- Timestamp: timestamp (nullable = true)
 |    |    |-- Latitude: double (nullable = true)
 |    |    |-- Longitude: double (nullable = true)
 |    |    |-- Destination: string (nullable = true)
 |-- types: array (nullable = false)
 |    |-- element: string (containsNull = false)

A snapshot (compressed) of the data:

|     ID  |          Trajectory|               types|
|209223000|[{2024-11-01 02:5...|         [Undefined]|
|209508000|[{2024-11-01 08:2...| [Tanker, Undefined]|
|209864000|[{2024-11-01 14:4...|[Passenger, Undef...|
|210095000|[{2024-11-01 08:2...|[Passenger, Undef...|
|210350000|[{2024-11-01 00:0...|         [Undefined]|

EDIT no2:

spark = SparkSession.builder.appName("AIS_Data_Analysis").getOrCreate()

# exact file as used is available at "/"
datapath = "AIS_data"

path = os.path.join(datapath, filename),header=True,inferSchema=True)


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\IPython\core\", line 3579, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "C:\Users\seolase1\AppData\Local\Temp\ipykernel_16900\", line 1, in <module>
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\pyspark\sql\", line 1263, in collect
    sock_info = self._jdf.collectToPython()
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\py4j\", line 1322, in __call__
    return_value = get_return_value(
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\pyspark\errors\exceptions\", line 179, in deco
    return f(*a, **kw)
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\py4j\", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\py4j\", line 511, in send_command
    answer = smart_decode([:-1])
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.11_3.11.2544.0_x64__qbz5n2kfra8p0\Lib\", line 706, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\py4j\", line 1038, in send_command
    response = connection.send_command(command)
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\py4j\", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving

本文标签: pythonPySparkApplying User defined function (UDF) to entire row of dataframeStack Overflow