Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Jan 30, 2026

What changes were proposed in this pull request?

Delay the initialization of PythonArrowInput

Why are the changes needed?

1, the initialization of PythonArrowInput is too early

  // Use lazy val to initialize the fields before these are accessed in [[PythonArrowInput]]'s
  // constructor.
  override protected lazy val schema: StructType = _schema
  override protected lazy val timeZoneId: String = _timeZoneId

we have such code around the python plans to resolve arrowSchema initialization with null schema/timeZoneId.

If I revert such code then some Python UDF tests fails due to the Cannot invoke "org.apache.spark.sql.types.StructType.map(scala.Function1)" because "schema" is null, see https://github.com/zhengruifeng/spark/actions/runs/21478957166/job/61870333678

JVM stacktrace:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2) (localhost executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.types.StructType.map(scala.Function1)" because "schema" is null

	at org.apache.spark.sql.util.ArrowUtils$.toArrowSchema(ArrowUtils.scala:316)

	at org.apache.spark.sql.execution.python.PythonArrowInput.$init$(PythonArrowInput.scala:66)

	at org.apache.spark.sql.execution.python.streaming.ApplyInPandasWithStatePythonRunner.<init>(ApplyInPandasWithStatePythonRunner.scala:68)

	at 

Current fix is kind of tricky and doesn't cover all python nodes (e.g, we don't do it for ArrowPythonUDTFRunner and the behvaior might be undefined), and I suspect we still have similar issues on other vals in some cases, and override vals from subclasses are not always respected (Any treated as null, integers treated as zero, etc) .

To resolve it, I think we can change schema, timeZoneId, errorOnDuplicatedFieldNames, largeVarTypes to def since they are only used once (to get the arrow schema), and make allocator/root lazy

2, in case of mixin of ArrowPythonRunner and GroupedPythonArrowInput like

val runner = new ArrowPythonWithNamedArgumentRunner(
pyFuncs,
evalType,
argMetas,
pythonInputSchema,
sessionLocalTimeZone,
largeVarTypes,
pythonRunnerConf,
pythonMetrics,
jobArtifactUUID,
sessionUUID) with GroupedPythonArrowInput

the runner here actually inherits the allocator/root from ArrowPythonWithNamedArgumentRunner -> RowInputArrowPythonRunner -> BasicPythonArrowInput -> PythonArrowInput, but the allocator and root are not used in this runner, that's because the the GroupedPythonArrowInput has override the def newWriter method, and the GroupedPythonArrowInput will create another allocator/root in its call of createAndStartArrowWriter.

In this case, we also need to make allocator and root lazy to avoid unnecessary resource allocation.

Does this PR introduce any user-facing change?

undefined behavior / failures due to weird initialization order -> clearly respect subclass's override

How was this patch tested?

CI

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions
Copy link

github-actions bot commented Jan 30, 2026

JIRA Issue Information

=== Improvement SPARK-55285 ===
Summary: Fix the initialization of PythonArrowInput
Assignee: None
Status: Open
Affected: ["4.2"]


This comment was automatically generated by GitHub Actions

@zhengruifeng zhengruifeng changed the title [WIP][SQL][PYTHON] Delay the initialization of PythonArrowInput [SPARK-55285][SQL][PYTHON] Delay the initialization of PythonArrowInput Jan 30, 2026
@zhengruifeng zhengruifeng marked this pull request as ready for review January 30, 2026 03:08
throw SparkException.internalError(
s"Unsupported Arrow compression codec: $other. Supported values: none, zstd, lz4")
}
protected val unloader = new VectorUnloader(root, true, codec, true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unloader is only used in BasicPythonArrowInput, move it to the subclass

@zhengruifeng
Copy link
Contributor Author

zhengruifeng commented Jan 30, 2026

A simple example to illustrate the initialization order issue:

scala> trait A {
       val a: Int
       val b = a + 1
       }
// defined trait A

scala> class B extends A { override val a: Int = 2 }
// defined class B

scala> val b = new B
val b: B = B@1492c9d

scala> b.b
val res0: Int = 1

scala> trait A {
       def a: Int
       val b = a + 1
       }
// defined trait A

scala>

scala> class B extends A { override val a: Int = 2 }
// defined class B

scala> val b = new B
val b: B = B@3d741969

scala> b.b
val res1: Int = 1

scala>

scala> trait A {
       def a: Int
       def b = a + 1
       }
// defined trait A

scala> class B extends A { override val a: Int = 2 }
// defined class B

scala> val b = new B
val b: B = B@5fca8642

scala> b.b
val res2: Int = 3

scala> trait A {
       def a: Int
       lazy val b = a + 1
}
// defined trait A

scala> class B extends A { override val a: Int = 2 }
// defined class B

scala> val b = new B
val b: B = B@2536edc3

scala> b.b
val res3: Int = 3

see https://docs.scala-lang.org/tutorials/FAQ/initialization-order.html for more details


private val sqlConf = SQLConf.get

// Use lazy val to initialize the fields before these are accessed in [[PythonArrowInput]]'s
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we didn't apply such fix in all python runners

@zhengruifeng zhengruifeng changed the title [SPARK-55285][SQL][PYTHON] Delay the initialization of PythonArrowInput [SPARK-55285][SQL][PYTHON] Fix the initialization of PythonArrowInput Jan 31, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants