@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
22
22
23
23
import com .codahale .metrics .MetricRegistry
24
24
import com .google .common .annotations .VisibleForTesting
25
+ import org .apache .commons .lang3 .StringUtils
25
26
26
27
import org .apache .kyuubi .{KyuubiException , KyuubiSQLException , Utils }
27
28
import org .apache .kyuubi .config .KyuubiConf
@@ -214,6 +215,20 @@ class BatchJobSubmission(
214
215
metadata match {
215
216
case Some (metadata) if metadata.peerInstanceClosed =>
216
217
setState(OperationState .CANCELED )
218
+ case Some (metadata)
219
+ // in case it has been updated by peer kyuubi instance, see KYUUBI-6278
220
+ if StringUtils .isNotBlank(metadata.engineState) &&
221
+ ApplicationState .isTerminated(ApplicationState .withName(metadata.engineState)) =>
222
+ _applicationInfo = Some (new ApplicationInfo (
223
+ id = metadata.engineId,
224
+ name = metadata.engineName,
225
+ state = ApplicationState .withName(metadata.engineState),
226
+ url = Option (metadata.engineUrl),
227
+ error = metadata.engineError))
228
+ if (applicationFailed(_applicationInfo, appOperation)) {
229
+ throw new KyuubiException (
230
+ s " $batchType batch[ $batchId] job failed: ${_applicationInfo}" )
231
+ }
217
232
case Some (metadata) if metadata.state == OperationState .PENDING .toString =>
218
233
// case 1: new batch job created using batch impl v2
219
234
// case 2: batch job from recovery, do submission only when previous state is
@@ -331,8 +346,7 @@ class BatchJobSubmission(
331
346
setStateIfNotCanceled(OperationState .RUNNING )
332
347
}
333
348
if (_applicationInfo.isEmpty) {
334
- info(s " The $batchType batch[ $batchId] job: $appId not found, assume that it has finished. " )
335
- return
349
+ _applicationInfo = Some (ApplicationInfo .NOT_FOUND )
336
350
}
337
351
if (applicationFailed(_applicationInfo, appOperation)) {
338
352
throw new KyuubiException (s " $batchType batch[ $batchId] job failed: ${_applicationInfo}" )
0 commit comments