/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.factories;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.FallbackKey;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.DecodingFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.EncodingFormatFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FormatFactory;
import org.apache.flink.table.factories.ManagedTableFactory;
import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.factories.NoMatchingTableFactoryException;
import org.apache.flink.table.factories.ServiceLoaderUtil;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.module.CommonModuleOptions;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public final class FactoryUtil {
    private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
    public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version").intType().defaultValue(1).withDescription("Version of the overall property design. This option is meant for future backwards compatibility.");
    public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector").stringType().noDefaultValue().withDescription("Uniquely identifies the connector of a dynamic table that is used for accessing data in an external system. Its value is used during table source and table sink discovery.");
    public static final ConfigOption<String> FORMAT = ConfigOptions.key("format").stringType().noDefaultValue().withDescription("Defines the format identifier for encoding data. The identifier is used to discover a suitable format factory.");
    public static final ConfigOption<Integer> SINK_PARALLELISM = ConfigOptions.key("sink.parallelism").intType().noDefaultValue().withDescription("Defines a custom parallelism for the sink. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.");
    public static final ConfigOption<List<String>> SQL_GATEWAY_ENDPOINT_TYPE = ConfigOptions.key("sql-gateway.endpoint.type").stringType().asList().defaultValues((String[])new String[]{"rest"}).withDescription("Specify the endpoints that are used.");
    public static final String FORMAT_SUFFIX = ".format";
    public static final String PLACEHOLDER_SYMBOL = "#";

    public static DynamicTableSource createDynamicTableSource(@Nullable DynamicTableSourceFactory preferredFactory, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, Map<String, String> enrichmentOptions, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary) {
        DefaultDynamicTableContext context = new DefaultDynamicTableContext(objectIdentifier, catalogTable, enrichmentOptions, configuration, classLoader, isTemporary);
        try {
            DynamicTableSourceFactory factory = preferredFactory != null ? preferredFactory : FactoryUtil.discoverTableFactory(DynamicTableSourceFactory.class, context);
            return factory.createDynamicTableSource(context);
        }
        catch (Throwable t) {
            throw new ValidationException(String.format("Unable to create a source for reading table '%s'.\n\nTable options are:\n\n%s", objectIdentifier.asSummaryString(), catalogTable.getOptions().entrySet().stream().map(e -> FactoryUtil.stringifyOption((String)e.getKey(), (String)e.getValue())).sorted().collect(Collectors.joining("\n"))), t);
        }
    }

    @Deprecated
    public static DynamicTableSource createDynamicTableSource(@Nullable DynamicTableSourceFactory preferredFactory, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary) {
        return FactoryUtil.createDynamicTableSource(preferredFactory, objectIdentifier, catalogTable, Collections.emptyMap(), configuration, classLoader, isTemporary);
    }

    @Deprecated
    public static DynamicTableSource createTableSource(@Nullable Catalog catalog, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary) {
        DefaultDynamicTableContext context = new DefaultDynamicTableContext(objectIdentifier, catalogTable, Collections.emptyMap(), configuration, classLoader, isTemporary);
        return FactoryUtil.createDynamicTableSource(FactoryUtil.getDynamicTableFactory(DynamicTableSourceFactory.class, catalog, context), objectIdentifier, catalogTable, Collections.emptyMap(), configuration, classLoader, isTemporary);
    }

    public static DynamicTableSink createDynamicTableSink(@Nullable DynamicTableSinkFactory preferredFactory, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, Map<String, String> enrichmentOptions, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary) {
        DefaultDynamicTableContext context = new DefaultDynamicTableContext(objectIdentifier, catalogTable, enrichmentOptions, configuration, classLoader, isTemporary);
        try {
            DynamicTableSinkFactory factory = preferredFactory != null ? preferredFactory : FactoryUtil.discoverTableFactory(DynamicTableSinkFactory.class, context);
            return factory.createDynamicTableSink(context);
        }
        catch (Throwable t) {
            throw new ValidationException(String.format("Unable to create a sink for writing table '%s'.\n\nTable options are:\n\n%s", objectIdentifier.asSummaryString(), catalogTable.getOptions().entrySet().stream().map(e -> FactoryUtil.stringifyOption((String)e.getKey(), (String)e.getValue())).sorted().collect(Collectors.joining("\n"))), t);
        }
    }

    @Deprecated
    public static DynamicTableSink createDynamicTableSink(@Nullable DynamicTableSinkFactory preferredFactory, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary) {
        return FactoryUtil.createDynamicTableSink(preferredFactory, objectIdentifier, catalogTable, Collections.emptyMap(), configuration, classLoader, isTemporary);
    }

    @Deprecated
    public static DynamicTableSink createTableSink(@Nullable Catalog catalog, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary) {
        DefaultDynamicTableContext context = new DefaultDynamicTableContext(objectIdentifier, catalogTable, Collections.emptyMap(), configuration, classLoader, isTemporary);
        return FactoryUtil.createDynamicTableSink(FactoryUtil.getDynamicTableFactory(DynamicTableSinkFactory.class, catalog, context), objectIdentifier, catalogTable, Collections.emptyMap(), configuration, classLoader, isTemporary);
    }

    public static CatalogFactoryHelper createCatalogFactoryHelper(CatalogFactory factory, CatalogFactory.Context context) {
        return new CatalogFactoryHelper(factory, context);
    }

    public static ModuleFactoryHelper createModuleFactoryHelper(ModuleFactory factory, ModuleFactory.Context context) {
        return new ModuleFactoryHelper(factory, context);
    }

    public static TableFactoryHelper createTableFactoryHelper(DynamicTableFactory factory, DynamicTableFactory.Context context) {
        return new TableFactoryHelper(factory, context);
    }

    public static Catalog createCatalog(String catalogName, Map<String, String> options, ReadableConfig configuration, ClassLoader classLoader) {
        try {
            CatalogFactory legacyFactory = TableFactoryService.find(CatalogFactory.class, options, classLoader);
            return legacyFactory.createCatalog(catalogName, options);
        }
        catch (NoMatchingTableFactoryException e) {
            DefaultCatalogContext discoveryContext = new DefaultCatalogContext(catalogName, options, configuration, classLoader);
            try {
                CatalogFactory factory = FactoryUtil.getCatalogFactory(discoveryContext);
                Map<String, String> factoryOptions = options.entrySet().stream().filter(entry -> !CommonCatalogOptions.CATALOG_TYPE.key().equals(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                DefaultCatalogContext context = new DefaultCatalogContext(catalogName, factoryOptions, configuration, classLoader);
                return factory.createCatalog(context);
            }
            catch (Throwable t) {
                throw new ValidationException(String.format("Unable to create catalog '%s'.%n%nCatalog options are:%n%s", catalogName, options.entrySet().stream().map(optionEntry -> FactoryUtil.stringifyOption((String)optionEntry.getKey(), (String)optionEntry.getValue())).sorted().collect(Collectors.joining("\n"))), t);
            }
        }
    }

    public static Module createModule(String moduleName, Map<String, String> options, ReadableConfig configuration, ClassLoader classLoader) {
        if (options.containsKey(CommonModuleOptions.MODULE_TYPE.key())) {
            throw new ValidationException(String.format("Option '%s' = '%s' is not supported since module name is used to find module", CommonModuleOptions.MODULE_TYPE.key(), options.get(CommonModuleOptions.MODULE_TYPE.key())));
        }
        try {
            HashMap<String, String> optionsWithType = new HashMap<String, String>(options);
            optionsWithType.put(CommonModuleOptions.MODULE_TYPE.key(), moduleName);
            ModuleFactory legacyFactory = TableFactoryService.find(ModuleFactory.class, optionsWithType, classLoader);
            return legacyFactory.createModule(optionsWithType);
        }
        catch (NoMatchingTableFactoryException e) {
            DefaultModuleContext discoveryContext = new DefaultModuleContext(options, configuration, classLoader);
            try {
                ModuleFactory factory = FactoryUtil.discoverFactory(discoveryContext.getClassLoader(), ModuleFactory.class, moduleName);
                DefaultModuleContext context = new DefaultModuleContext(options, configuration, classLoader);
                return factory.createModule(context);
            }
            catch (Throwable t) {
                throw new ValidationException(String.format("Unable to create module '%s'.%n%nModule options are:%n%s", moduleName, options.entrySet().stream().map(optionEntry -> FactoryUtil.stringifyOption((String)optionEntry.getKey(), (String)optionEntry.getValue())).sorted().collect(Collectors.joining("\n"))), t);
            }
        }
    }

    public static <T extends Factory> T discoverFactory(ClassLoader classLoader, Class<T> factoryClass, String factoryIdentifier) {
        List<Factory> factories = FactoryUtil.discoverFactories(classLoader);
        List foundFactories = factories.stream().filter(f -> factoryClass.isAssignableFrom(f.getClass())).collect(Collectors.toList());
        if (foundFactories.isEmpty()) {
            throw new ValidationException(String.format("Could not find any factories that implement '%s' in the classpath.", factoryClass.getName()));
        }
        List matchingFactories = foundFactories.stream().filter(f -> f.factoryIdentifier().equals(factoryIdentifier)).collect(Collectors.toList());
        if (matchingFactories.isEmpty()) {
            throw new ValidationException(String.format("Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\nAvailable factory identifiers are:\n\n%s", factoryIdentifier, factoryClass.getName(), foundFactories.stream().map(Factory::factoryIdentifier).filter(identifier -> !"default".equals(identifier)).distinct().sorted().collect(Collectors.joining("\n"))));
        }
        if (matchingFactories.size() > 1) {
            throw new ValidationException(String.format("Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\nAmbiguous factory classes are:\n\n%s", factoryIdentifier, factoryClass.getName(), matchingFactories.stream().map(f -> f.getClass().getName()).sorted().collect(Collectors.joining("\n"))));
        }
        return (T)((Factory)matchingFactories.get(0));
    }

    public static void validateFactoryOptions(Factory factory, ReadableConfig options) {
        FactoryUtil.validateFactoryOptions(factory.requiredOptions(), factory.optionalOptions(), options);
    }

    public static void validateFactoryOptions(Set<ConfigOption<?>> requiredOptions, Set<ConfigOption<?>> optionalOptions, ReadableConfig options) {
        List missingRequiredOptions = requiredOptions.stream().filter(option -> FactoryUtil.allKeys(option).noneMatch(k -> k.contains(PLACEHOLDER_SYMBOL))).filter(option -> FactoryUtil.readOption(options, option) == null).map(ConfigOption::key).sorted().collect(Collectors.toList());
        if (!missingRequiredOptions.isEmpty()) {
            throw new ValidationException(String.format("One or more required options are missing.\n\nMissing required options are:\n\n%s", String.join((CharSequence)"\n", missingRequiredOptions)));
        }
        optionalOptions.forEach(option -> FactoryUtil.readOption(options, option));
    }

    public static void validateUnconsumedKeys(String factoryIdentifier, Set<String> allOptionKeys, Set<String> consumedOptionKeys, Set<String> deprecatedOptionKeys) {
        HashSet<String> remainingOptionKeys = new HashSet<String>(allOptionKeys);
        remainingOptionKeys.removeAll(consumedOptionKeys);
        if (!remainingOptionKeys.isEmpty()) {
            throw new ValidationException(String.format("Unsupported options found for '%s'.\n\nUnsupported options:\n\n%s\n\nSupported options:\n\n%s", factoryIdentifier, remainingOptionKeys.stream().sorted().collect(Collectors.joining("\n")), consumedOptionKeys.stream().map(k -> {
                if (deprecatedOptionKeys.contains(k)) {
                    return String.format("%s (deprecated)", k);
                }
                return k;
            }).sorted().collect(Collectors.joining("\n"))));
        }
    }

    public static void validateUnconsumedKeys(String factoryIdentifier, Set<String> allOptionKeys, Set<String> consumedOptionKeys) {
        FactoryUtil.validateUnconsumedKeys(factoryIdentifier, allOptionKeys, consumedOptionKeys, Collections.emptySet());
    }

    public static String getFormatPrefix(ConfigOption<String> formatOption, String formatIdentifier) {
        String formatOptionKey = formatOption.key();
        if (formatOptionKey.equals(FORMAT.key())) {
            return formatIdentifier + ".";
        }
        if (formatOptionKey.endsWith(FORMAT_SUFFIX)) {
            String keyPrefix = formatOptionKey.substring(0, formatOptionKey.length() - FORMAT_SUFFIX.length());
            return keyPrefix + "." + formatIdentifier + ".";
        }
        throw new ValidationException("Format identifier key should be 'format' or suffix with '.format', don't support format identifier key '" + formatOptionKey + "'.");
    }

    private static <T extends DynamicTableFactory> T getDynamicTableFactory(Class<T> factoryClass, @Nullable Catalog catalog, DynamicTableFactory.Context context) {
        return (T)FactoryUtil.getDynamicTableFactory(factoryClass, catalog).orElseGet(() -> FactoryUtil.discoverTableFactory(factoryClass, context));
    }

    private static <T extends DynamicTableFactory> Optional<T> getDynamicTableFactory(Class<T> factoryClass, @Nullable Catalog catalog) {
        if (catalog == null) {
            return Optional.empty();
        }
        return catalog.getFactory().map(f -> factoryClass.isAssignableFrom(f.getClass()) ? (DynamicTableFactory)f : null);
    }

    private static <T extends DynamicTableFactory> T discoverTableFactory(Class<T> factoryClass, DynamicTableFactory.Context context) {
        String connectorOption = context.getCatalogTable().getOptions().get(CONNECTOR.key());
        if (connectorOption == null) {
            return FactoryUtil.discoverManagedTableFactory(context.getClassLoader(), factoryClass);
        }
        try {
            return (T)((DynamicTableFactory)FactoryUtil.discoverFactory(context.getClassLoader(), factoryClass, connectorOption));
        }
        catch (ValidationException e) {
            throw FactoryUtil.enrichNoMatchingConnectorError(factoryClass, context, connectorOption);
        }
    }

    private static CatalogFactory getCatalogFactory(CatalogFactory.Context context) {
        String catalogType = context.getOptions().get(CommonCatalogOptions.CATALOG_TYPE.key());
        if (catalogType == null) {
            throw new ValidationException(String.format("Catalog options do not contain an option key '%s' for discovering a catalog.", CommonCatalogOptions.CATALOG_TYPE.key()));
        }
        return FactoryUtil.discoverFactory(context.getClassLoader(), CatalogFactory.class, catalogType);
    }

    private static ValidationException enrichNoMatchingConnectorError(Class<?> factoryClass, DynamicTableFactory.Context context, String connectorOption) {
        DynamicTableFactory factory;
        try {
            factory = FactoryUtil.discoverFactory(context.getClassLoader(), DynamicTableFactory.class, connectorOption);
        }
        catch (ValidationException e) {
            return new ValidationException(String.format("Cannot discover a connector using option: %s", FactoryUtil.stringifyOption(CONNECTOR.key(), connectorOption)), e);
        }
        Class<DynamicTableSourceFactory> sourceFactoryClass = DynamicTableSourceFactory.class;
        Class<DynamicTableSinkFactory> sinkFactoryClass = DynamicTableSinkFactory.class;
        if (sourceFactoryClass.equals(factoryClass) && sinkFactoryClass.isAssignableFrom(factory.getClass())) {
            return new ValidationException(String.format("Connector '%s' can only be used as a sink. It cannot be used as a source.", connectorOption));
        }
        if (sinkFactoryClass.equals(factoryClass) && sourceFactoryClass.isAssignableFrom(factory.getClass())) {
            return new ValidationException(String.format("Connector '%s' can only be used as a source. It cannot be used as a sink.", connectorOption));
        }
        return new ValidationException(String.format("Connector '%s' does neither implement the '%s' nor the '%s' interface.", connectorOption, sourceFactoryClass.getName(), sinkFactoryClass.getName()));
    }

    static <T extends DynamicTableFactory> T discoverManagedTableFactory(ClassLoader classLoader, Class<T> implementClass) {
        List<Factory> factories = FactoryUtil.discoverFactories(classLoader);
        List foundFactories = factories.stream().filter(f -> ManagedTableFactory.class.isAssignableFrom(f.getClass())).filter(f -> implementClass.isAssignableFrom(f.getClass())).collect(Collectors.toList());
        if (foundFactories.isEmpty()) {
            throw new ValidationException(String.format("Table options do not contain an option key 'connector' for discovering a connector. Therefore, Flink assumes a managed table. However, a managed table factory that implements %s is not in the classpath.", implementClass.getName()));
        }
        if (foundFactories.size() > 1) {
            throw new ValidationException(String.format("Multiple factories for managed table found in the classpath.\n\nAmbiguous factory classes are:\n\n%s", foundFactories.stream().map(f -> f.getClass().getName()).sorted().collect(Collectors.joining("\n"))));
        }
        return (T)((DynamicTableFactory)foundFactories.get(0));
    }

    static List<Factory> discoverFactories(ClassLoader classLoader) {
        LinkedList<Factory> result = new LinkedList<Factory>();
        ServiceLoaderUtil.load(Factory.class, classLoader).forEach(loadResult -> {
            if (loadResult.hasFailed()) {
                if (loadResult.getError() instanceof NoClassDefFoundError) {
                    LOG.debug("NoClassDefFoundError when loading a " + Factory.class + ". This is expected when trying to load a format dependency but no flink-connector-files is loaded.", loadResult.getError());
                    return;
                }
                throw new TableException("Unexpected error when trying to load service provider for factories.", loadResult.getError());
            }
            result.add((Factory)loadResult.getService());
        });
        return result;
    }

    private static String stringifyOption(String key, String value) {
        if (GlobalConfiguration.isSensitive(key)) {
            value = "******";
        }
        return String.format("'%s'='%s'", EncodingUtils.escapeSingleQuotes(key), EncodingUtils.escapeSingleQuotes(value));
    }

    private static <T> T readOption(ReadableConfig options, ConfigOption<T> option) {
        try {
            return options.get(option);
        }
        catch (Throwable t) {
            throw new ValidationException(String.format("Invalid value for option '%s'.", option.key()), t);
        }
    }

    private static Set<String> allKeysExpanded(ConfigOption<?> option, Set<String> actualKeys) {
        return FactoryUtil.allKeysExpanded("", option, actualKeys);
    }

    private static Set<String> allKeysExpanded(String prefix, ConfigOption<?> option, Set<String> actualKeys) {
        Set<String> staticKeys = FactoryUtil.allKeys(option).map(k -> prefix + k).collect(Collectors.toSet());
        if (!ConfigurationUtils.canBePrefixMap(option)) {
            return staticKeys;
        }
        return Stream.concat(staticKeys.stream(), staticKeys.stream().flatMap(k -> actualKeys.stream().filter(c -> ConfigurationUtils.filterPrefixMapKey(k, c)))).collect(Collectors.toSet());
    }

    private static Stream<String> allKeys(ConfigOption<?> option) {
        return Stream.concat(Stream.of(option.key()), FactoryUtil.fallbackKeys(option));
    }

    private static Stream<String> fallbackKeys(ConfigOption<?> option) {
        return StreamSupport.stream(option.fallbackKeys().spliterator(), false).map(FallbackKey::getKey);
    }

    private static Stream<String> deprecatedKeys(ConfigOption<?> option) {
        return StreamSupport.stream(option.fallbackKeys().spliterator(), false).filter(FallbackKey::isDeprecated).map(FallbackKey::getKey);
    }

    private FactoryUtil() {
    }

    @Internal
    public static class DefaultModuleContext
    implements ModuleFactory.Context {
        private final Map<String, String> options;
        private final ReadableConfig configuration;
        private final ClassLoader classLoader;

        public DefaultModuleContext(Map<String, String> options, ReadableConfig configuration, ClassLoader classLoader) {
            this.options = options;
            this.configuration = configuration;
            this.classLoader = classLoader;
        }

        @Override
        public Map<String, String> getOptions() {
            return this.options;
        }

        @Override
        public ReadableConfig getConfiguration() {
            return this.configuration;
        }

        @Override
        public ClassLoader getClassLoader() {
            return this.classLoader;
        }
    }

    @Internal
    public static class DefaultCatalogContext
    implements CatalogFactory.Context {
        private final String name;
        private final Map<String, String> options;
        private final ReadableConfig configuration;
        private final ClassLoader classLoader;

        public DefaultCatalogContext(String name, Map<String, String> options, ReadableConfig configuration, ClassLoader classLoader) {
            this.name = name;
            this.options = options;
            this.configuration = configuration;
            this.classLoader = classLoader;
        }

        @Override
        public String getName() {
            return this.name;
        }

        @Override
        public Map<String, String> getOptions() {
            return this.options;
        }

        @Override
        public ReadableConfig getConfiguration() {
            return this.configuration;
        }

        @Override
        public ClassLoader getClassLoader() {
            return this.classLoader;
        }
    }

    @Internal
    public static class DefaultDynamicTableContext
    implements DynamicTableFactory.Context {
        private final ObjectIdentifier objectIdentifier;
        private final ResolvedCatalogTable catalogTable;
        private final Map<String, String> enrichmentOptions;
        private final ReadableConfig configuration;
        private final ClassLoader classLoader;
        private final boolean isTemporary;

        public DefaultDynamicTableContext(ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, Map<String, String> enrichmentOptions, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary) {
            this.objectIdentifier = objectIdentifier;
            this.catalogTable = catalogTable;
            this.enrichmentOptions = enrichmentOptions;
            this.configuration = configuration;
            this.classLoader = classLoader;
            this.isTemporary = isTemporary;
        }

        @Override
        public ObjectIdentifier getObjectIdentifier() {
            return this.objectIdentifier;
        }

        @Override
        public ResolvedCatalogTable getCatalogTable() {
            return this.catalogTable;
        }

        @Override
        public Map<String, String> getEnrichmentOptions() {
            return this.enrichmentOptions;
        }

        @Override
        public ReadableConfig getConfiguration() {
            return this.configuration;
        }

        @Override
        public ClassLoader getClassLoader() {
            return this.classLoader;
        }

        @Override
        public boolean isTemporary() {
            return this.isTemporary;
        }
    }

    @PublicEvolving
    public static class TableFactoryHelper
    extends FactoryHelper<DynamicTableFactory> {
        private final DynamicTableFactory.Context context;
        private final Configuration enrichingOptions;

        private TableFactoryHelper(DynamicTableFactory tableFactory, DynamicTableFactory.Context context) {
            super(tableFactory, context.getCatalogTable().getOptions(), PROPERTY_VERSION, CONNECTOR);
            this.context = context;
            this.enrichingOptions = Configuration.fromMap(context.getEnrichmentOptions());
            this.forwardOptions();
        }

        @Override
        public ReadableConfig getOptions() {
            return super.getOptions();
        }

        public <I, F extends DecodingFormatFactory<I>> DecodingFormat<I> discoverDecodingFormat(Class<F> formatFactoryClass, ConfigOption<String> formatOption) {
            return this.discoverOptionalDecodingFormat(formatFactoryClass, formatOption).orElseThrow(() -> new ValidationException(String.format("Could not find required scan format '%s'.", formatOption.key())));
        }

        public <I, F extends DecodingFormatFactory<I>> Optional<DecodingFormat<I>> discoverOptionalDecodingFormat(Class<F> formatFactoryClass, ConfigOption<String> formatOption) {
            return this.discoverOptionalFormatFactory(formatFactoryClass, formatOption).map(formatFactory -> {
                String formatPrefix = this.formatPrefix((Factory)formatFactory, formatOption);
                try {
                    return formatFactory.createDecodingFormat(this.context, this.createFormatOptions(formatPrefix, (FormatFactory)formatFactory));
                }
                catch (Throwable t) {
                    throw new ValidationException(String.format("Error creating scan format '%s' in option space '%s'.", formatFactory.factoryIdentifier(), formatPrefix), t);
                }
            });
        }

        public <I, F extends EncodingFormatFactory<I>> EncodingFormat<I> discoverEncodingFormat(Class<F> formatFactoryClass, ConfigOption<String> formatOption) {
            return this.discoverOptionalEncodingFormat(formatFactoryClass, formatOption).orElseThrow(() -> new ValidationException(String.format("Could not find required sink format '%s'.", formatOption.key())));
        }

        public <I, F extends EncodingFormatFactory<I>> Optional<EncodingFormat<I>> discoverOptionalEncodingFormat(Class<F> formatFactoryClass, ConfigOption<String> formatOption) {
            return this.discoverOptionalFormatFactory(formatFactoryClass, formatOption).map(formatFactory -> {
                String formatPrefix = this.formatPrefix((Factory)formatFactory, formatOption);
                try {
                    return formatFactory.createEncodingFormat(this.context, this.createFormatOptions(formatPrefix, (FormatFactory)formatFactory));
                }
                catch (Throwable t) {
                    throw new ValidationException(String.format("Error creating sink format '%s' in option space '%s'.", formatFactory.factoryIdentifier(), formatPrefix), t);
                }
            });
        }

        private void forwardOptions() {
            for (ConfigOption<?> option : ((DynamicTableFactory)this.factory).forwardOptions()) {
                this.enrichingOptions.getOptional(option).ifPresent(o -> this.allOptions.set((ConfigOption)option, o));
            }
        }

        private <F extends Factory> Optional<F> discoverOptionalFormatFactory(Class<F> formatFactoryClass, ConfigOption<String> formatOption) {
            String identifier = this.allOptions.get(formatOption);
            this.checkFormatIdentifierMatchesWithEnrichingOptions(formatOption, identifier);
            if (identifier == null) {
                return Optional.empty();
            }
            F factory = FactoryUtil.discoverFactory(this.context.getClassLoader(), formatFactoryClass, identifier);
            String formatPrefix = this.formatPrefix((Factory)factory, formatOption);
            ArrayList consumedOptions = new ArrayList();
            consumedOptions.addAll(factory.requiredOptions());
            consumedOptions.addAll(factory.optionalOptions());
            consumedOptions.stream().flatMap(option -> FactoryUtil.allKeysExpanded(formatPrefix, option, this.allOptions.keySet()).stream()).forEach(this.consumedOptionKeys::add);
            consumedOptions.stream().flatMap(x$0 -> FactoryUtil.deprecatedKeys(x$0)).map(k -> formatPrefix + k).forEach(this.deprecatedOptionKeys::add);
            return Optional.of(factory);
        }

        private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOption) {
            String identifier = formatFactory.factoryIdentifier();
            return FactoryUtil.getFormatPrefix(formatOption, identifier);
        }

        private ReadableConfig createFormatOptions(String formatPrefix, FormatFactory formatFactory) {
            Set<ConfigOption<?>> forwardableConfigOptions = formatFactory.forwardOptions();
            DelegatingConfiguration formatConf = new DelegatingConfiguration(this.allOptions, formatPrefix);
            if (forwardableConfigOptions.isEmpty()) {
                return formatConf;
            }
            DelegatingConfiguration formatConfFromEnrichingOptions = new DelegatingConfiguration(this.enrichingOptions, formatPrefix);
            for (ConfigOption<?> option : forwardableConfigOptions) {
                ((Configuration)formatConfFromEnrichingOptions).getOptional(option).ifPresent(o -> formatConf.set((ConfigOption)option, o));
            }
            return formatConf;
        }

        private void checkFormatIdentifierMatchesWithEnrichingOptions(ConfigOption<String> formatOption, String identifierFromPlan) {
            Optional<String> identifierFromEnrichingOptions = this.enrichingOptions.getOptional(formatOption);
            if (!identifierFromEnrichingOptions.isPresent()) {
                return;
            }
            if (identifierFromPlan == null) {
                throw new ValidationException(String.format("The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. This is invalid, as either only the persisted plan table defines the format, or both the persisted plan table and the catalog table defines the same format.", formatOption, identifierFromEnrichingOptions.get()));
            }
            if (!Objects.equals(identifierFromPlan, identifierFromEnrichingOptions.get())) {
                throw new ValidationException(String.format("Both persisted plan table and catalog table define the format option '%s', but they mismatch: '%s' != '%s'.", formatOption, identifierFromPlan, identifierFromEnrichingOptions.get()));
            }
        }
    }

    @PublicEvolving
    public static class ModuleFactoryHelper
    extends FactoryHelper<ModuleFactory> {
        public ModuleFactoryHelper(ModuleFactory moduleFactory, ModuleFactory.Context context) {
            super(moduleFactory, context.getOptions(), PROPERTY_VERSION);
        }
    }

    @PublicEvolving
    public static class CatalogFactoryHelper
    extends FactoryHelper<CatalogFactory> {
        public CatalogFactoryHelper(CatalogFactory catalogFactory, CatalogFactory.Context context) {
            super(catalogFactory, context.getOptions(), PROPERTY_VERSION);
        }
    }

    @PublicEvolving
    public static class FactoryHelper<F extends Factory> {
        protected final F factory;
        protected final Configuration allOptions;
        protected final Set<String> consumedOptionKeys;
        protected final Set<String> deprecatedOptionKeys;

        public FactoryHelper(F factory, Map<String, String> configuration, ConfigOption<?> ... implicitOptions) {
            this.factory = factory;
            this.allOptions = Configuration.fromMap(configuration);
            ArrayList consumedOptions = new ArrayList();
            consumedOptions.addAll(Arrays.asList(implicitOptions));
            consumedOptions.addAll(factory.requiredOptions());
            consumedOptions.addAll(factory.optionalOptions());
            this.consumedOptionKeys = consumedOptions.stream().flatMap(option -> FactoryUtil.allKeysExpanded(option, this.allOptions.keySet()).stream()).collect(Collectors.toSet());
            this.deprecatedOptionKeys = consumedOptions.stream().flatMap(x$0 -> FactoryUtil.deprecatedKeys(x$0)).collect(Collectors.toSet());
        }

        public void validate() {
            FactoryUtil.validateFactoryOptions(this.factory, this.allOptions);
            FactoryUtil.validateUnconsumedKeys(this.factory.factoryIdentifier(), this.allOptions.keySet(), this.consumedOptionKeys, this.deprecatedOptionKeys);
        }

        public void validateExcept(String ... prefixesToSkip) {
            Preconditions.checkArgument(prefixesToSkip.length > 0, "Prefixes to skip can not be empty.");
            List<String> prefixesList = Arrays.asList(prefixesToSkip);
            this.consumedOptionKeys.addAll(this.allOptions.keySet().stream().filter(key -> prefixesList.stream().anyMatch(key::startsWith)).collect(Collectors.toSet()));
            this.validate();
        }

        public ReadableConfig getOptions() {
            return this.allOptions;
        }
    }
}

