admin管理员组

文章数量:1390913

I would like to shift the values of a column downwards and have the last element be put on the top. The lag window function almost accomplishes what I want:

df.select($"sample_column", lag("sample_column", 1).over(windowSpec).as("lag")).show()
+-----------------+----+
|sample_column    | lag|
+-----------------+----+
|                0|NULL|
|                1|   0|
|                2|   1|
|                3|   2|
|                4|   3|
|                5|   4|
|                6|   5|
|                7|   6|
|                8|   7|
|                9|   8|
|               10|   9|
|               11|  10|
|               12|  11|
|               13|  12|
|               14|  13|
|               15|  14|
|               16|  15|
|               17|  16|
|               18|  17|
|               19|  18|
+-----------------+----+
only showing top 20 rows

However, if I try to replace the NULL with the command

df.select($"sample_column", lag("sample_column", 1, last("__index__level_0__").over(windowSpec)).over(windowSpec).as("lag")).show()

I get the error:

.apache.spark.SparkRuntimeException: [UNSUPPORTED_FEATURE.LITERAL_TYPE] The feature is not supported: Literal for 'last(__index__level_0__) OVER (PARTITION BY file_name ORDER BY sample_column ASC NULLS FIRST unspecifiedframe$())' of class .apache.spark.sql.Column.
  .apache.spark.sql.errors.QueryExecutionErrors$.literalTypeUnsupportedError(QueryExecutionErrors.scala:320)
  .apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:101)
  .apache.spark.sql.functions$.lag(functions.scala:1487)
  .apache.spark.sql.functions$.lag(functions.scala:1471)
  .apache.spark.sql.functions$.lag(functions.scala:1457)
  ammonite.$sess.cmd10$Helper.<init>(cmd10.sc:1)
  ammonite.$sess.cmd10$.<init>(cmd10.sc:7)
  ammonite.$sess.cmd10$.<clinit>(cmd10.sc:-1)

How to do this correctly? Does Spark already has a default implementation for it?

I would like to shift the values of a column downwards and have the last element be put on the top. The lag window function almost accomplishes what I want:

df.select($"sample_column", lag("sample_column", 1).over(windowSpec).as("lag")).show()
+-----------------+----+
|sample_column    | lag|
+-----------------+----+
|                0|NULL|
|                1|   0|
|                2|   1|
|                3|   2|
|                4|   3|
|                5|   4|
|                6|   5|
|                7|   6|
|                8|   7|
|                9|   8|
|               10|   9|
|               11|  10|
|               12|  11|
|               13|  12|
|               14|  13|
|               15|  14|
|               16|  15|
|               17|  16|
|               18|  17|
|               19|  18|
+-----------------+----+
only showing top 20 rows

However, if I try to replace the NULL with the command

df.select($"sample_column", lag("sample_column", 1, last("__index__level_0__").over(windowSpec)).over(windowSpec).as("lag")).show()

I get the error:

.apache.spark.SparkRuntimeException: [UNSUPPORTED_FEATURE.LITERAL_TYPE] The feature is not supported: Literal for 'last(__index__level_0__) OVER (PARTITION BY file_name ORDER BY sample_column ASC NULLS FIRST unspecifiedframe$())' of class .apache.spark.sql.Column.
  .apache.spark.sql.errors.QueryExecutionErrors$.literalTypeUnsupportedError(QueryExecutionErrors.scala:320)
  .apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:101)
  .apache.spark.sql.functions$.lag(functions.scala:1487)
  .apache.spark.sql.functions$.lag(functions.scala:1471)
  .apache.spark.sql.functions$.lag(functions.scala:1457)
  ammonite.$sess.cmd10$Helper.<init>(cmd10.sc:1)
  ammonite.$sess.cmd10$.<init>(cmd10.sc:7)
  ammonite.$sess.cmd10$.<clinit>(cmd10.sc:-1)

How to do this correctly? Does Spark already has a default implementation for it?

Share Improve this question asked Mar 14 at 11:45 Ícaro LorranÍcaro Lorran 1167 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 2

If this column doesn't have any nulls, then you can try to use lag and last functions together like this:


import spark.implicits._
import .apache.spark.sql.expressions.Window

val df = spark.sparkContext.parallelize(Seq(
  (0),
  (1),
  (2),
  (3),
  (4),
  (5),
  (6),
  (7)
)).toDF("sample_column")

val window = Window.orderBy($"sample_column")
df.select(
   $"sample_column", 
   coalesce(
      lag("sample_column", 1).over(window),
      last("sample_column").over(window.rowsBetween(0, Window.unboundedFollowing))
   ).as("lag")
).orderBy($"sample_column")
.show()
+-------------+---+
|sample_column|lag|
+-------------+---+
|            0|  7|
|            1|  0|
|            2|  1|
|            3|  2|
|            4|  3|
|            5|  4|
|            6|  5|
|            7|  6|
+-------------+---+

lag currently doesn't support the ignorenulls option, so you might have to separate the null rows out, compute the lag column for non-null rows and union the data frames.

本文标签: scalaHow to do a quotrollquot in SparkStack Overflow