admin管理员组

文章数量:1278919

I am quite new to Pyspark, though frequent user of pandas etc.

I have a spark dataframe consisting of columns: ID, Trajectory, type

Where ID is an integer, Trajectory is a list of dataframe-rows containing coordinate-information and type is a list of strings

The goal is to, for each ID, draw a polyline in a python folium-map, based on the coordinates in the Trajectory-column-list.

as reference:

df.first() produces the following outcome

Row(ID=209223000, Trajectory=[Row(Timestamp=datetime.datetime(2024, 11, 1, 2, 58, 29), Latitude=55.108018, Longitude=18.306343, Destination='Unknown'), Row(Timestamp=datetime.datetime(2024, 11, 1, 6, 18, 18), Latitude=55.115625, Longitude=18.103083, Destination='Unknown')], types=['Undefined'])

So to be clear, I have no need for an extra column being added to the dataframe, and no objections if that is neede either.

So the Pseudo would look something lke this:

create map-object m

For each Row in df:
    create polyline object ply
    ply.coordinates = [item.Lat,item.Long] for item in Row['Trajectory']
    ply.tooltip = Row['ID'] + Row[types]
    add ply to m

The end goal is to showcase the tracks of objects depending on certain filters being applied in the dataframe

I've tried to create an UDF with the pseudocode described, but I cant wrap my head around how to apply the UDF, since a UDF is applied on a column rather than a dataframe.

The expected result would be add polylines to the map-object based on each row of the dataframe, e.g. each individual ID.

EDIT:

Schema:

|-- ID: integer (nullable = true)
 |-- Trajectory: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- Timestamp: timestamp (nullable = true)
 |    |    |-- Latitude: double (nullable = true)
 |    |    |-- Longitude: double (nullable = true)
 |    |    |-- Destination: string (nullable = true)
 |-- types: array (nullable = false)
 |    |-- element: string (containsNull = false)

A snapshot (compressed) of the data:

+---------+--------------------+--------------------+
|     ID  |          Trajectory|               types|
+---------+--------------------+--------------------+
|209223000|[{2024-11-01 02:5...|         [Undefined]|
|209508000|[{2024-11-01 08:2...| [Tanker, Undefined]|
|209864000|[{2024-11-01 14:4...|[Passenger, Undef...|
|210095000|[{2024-11-01 08:2...|[Passenger, Undef...|
|210350000|[{2024-11-01 00:0...|         [Undefined]|
+---------+--------------------+--------------------+

EDIT no2:

spark = SparkSession.builder.appName("AIS_Data_Analysis").getOrCreate()

# exact file as used is available at "/"
datapath = "AIS_data"
filename="aisdk-2024-11-01.csv"

path = os.path.join(datapath, filename)
df_pyspark=spark.read.csv(path,header=True,inferSchema=True)
df_pyspark.printSchema()
df_pyspark.collect()

produces:

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\IPython\core\interactiveshell.py", line 3579, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "C:\Users\seolase1\AppData\Local\Temp\ipykernel_16900\381113642.py", line 1, in <module>
    df_pyspark.collect()
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\pyspark\sql\dataframe.py", line 1263, in collect
    sock_info = self._jdf.collectToPython()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\pyspark\errors\exceptions\captured.py", line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.11_3.11.2544.0_x64__qbz5n2kfra8p0\Lib\socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\seolase1\Code\TrAISformer-main\.venv\Lib\site-packages\py4j\clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving

本文标签: pythonPySparkApplying User defined function (UDF) to entire row of dataframeStack Overflow