#Flink ShutdownHookManager throws Trying to access closed classloader when destroying itself

7 messages · Page 1 of 1 (latest)

hasty plaza
#

I'm working on Flink application which has a kinesis consumer and ElasticSearch sink. My flink job works fine, I get the results I want, except at the end of application execution, when org.apache.hadoop.util ShutdownHookManager shutdownExecutor method is called I get this error:

Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183)
    at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2830)
    at org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3104)
    at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:3063)
    at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:3036)
    at org.apache.hadoop.conf.Configuration.loadProps(Configuration.java:2914)
    at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2896)
    at org.apache.hadoop.conf.Configuration.get(Configuration.java:1246)
    at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1863)
    at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
    at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
    at org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
    at org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
    at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)

I managed to debug it down to shutdown executor trying to retrieve shutdown timeout:

  • getShutdownTimeout(conf);
    Which uses:
  • new Configuration() default configuration
    Exception is thrown on:
  • conf.getTimeDuration(...)
    When Configuration class tries to access non existing inner field of FlinkUserCodeClassLoader.

Does anyone know how to configure Flink application to have FlinkUserCodeClassLoader inner field set to anything but null on default Configuration object?

high brambleBOT
#

This post has been reserved for your question.

Hey @hasty plaza! Please use /close or the Close Post button above when you're finished. Please remember to follow the help guidelines. This post will be automatically closed after 300 minutes of inactivity.

TIP: Narrow down your issue to simple and precise questions to maximize the chance that others will reply in here.

high brambleBOT
#

💤 Post marked as dormant

This post has been inactive for over 300 minutes, thus, it has been archived.
If your question was not answered yet, feel free to re-open this post or create a new one.

high brambleBOT
#

💤 Post marked as dormant

This post has been inactive for over 300 minutes, thus, it has been archived.
If your question was not answered yet, feel free to re-open this post or create a new one.

high brambleBOT
#

💤 Post marked as dormant

This post has been inactive for over 300 minutes, thus, it has been archived.
If your question was not answered yet, feel free to re-open this post or create a new one.

hasty plaza
#

Resolved my problem by adding:

Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader)

To my sink.

I think it forces my job to use System class loader instead of dynamic flink class loader which somewhere and somewhy was being removed when shutdown executor tries to use it for its own shutdown.