/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.table;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueFileStore;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.table.AbstractFileStoreTable;
import org.apache.flink.table.store.table.sink.MemoryTableWrite;
import org.apache.flink.table.store.table.sink.SequenceGenerator;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.KeyValueTableRead;
import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
import org.apache.flink.table.store.table.source.SplitGenerator;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
import org.apache.flink.table.store.utils.RowDataUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

public class ChangelogWithKeyFileStoreTable
extends AbstractFileStoreTable {
    private static final String KEY_FIELD_PREFIX = "_KEY_";
    private static final long serialVersionUID = 1L;
    private final KeyValueFileStore store;

    ChangelogWithKeyFileStoreTable(Path path, SchemaManager schemaManager, TableSchema tableSchema) {
        super(path, tableSchema);
        MergeFunction mergeFunction;
        RowType rowType = tableSchema.logicalRowType();
        Configuration conf = Configuration.fromMap(tableSchema.options());
        CoreOptions.MergeEngine mergeEngine = (CoreOptions.MergeEngine)((Object)conf.get(CoreOptions.MERGE_ENGINE));
        switch (mergeEngine) {
            case DEDUPLICATE: {
                mergeFunction = new DeduplicateMergeFunction();
                break;
            }
            case PARTIAL_UPDATE: {
                List fieldTypes = rowType.getChildren();
                RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.size()];
                for (int i = 0; i < fieldTypes.size(); ++i) {
                    fieldGetters[i] = RowDataUtils.createNullCheckingFieldGetter((LogicalType)fieldTypes.get(i), i);
                }
                mergeFunction = new PartialUpdateMergeFunction(fieldGetters);
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unsupported merge engine: " + (Object)((Object)mergeEngine));
            }
        }
        this.store = new KeyValueFileStore(schemaManager, tableSchema.id(), new CoreOptions(conf), tableSchema.logicalPartitionType(), this.addKeyNamePrefix(tableSchema.logicalBucketKeyType()), this.addKeyNamePrefix(tableSchema.logicalTrimmedPrimaryKeysType()), rowType, mergeFunction);
    }

    private RowType addKeyNamePrefix(RowType type) {
        return new RowType(type.getFields().stream().map(f -> new RowType.RowField(KEY_FIELD_PREFIX + f.getName(), f.getType(), (String)f.getDescription().orElse(null))).collect(Collectors.toList()));
    }

    @Override
    public TableScan newScan() {
        final KeyValueFileStoreScan scan = this.store.newScan();
        return new TableScan(scan, this.tableSchema, this.store.pathFactory()){

            @Override
            protected SplitGenerator splitGenerator(FileStorePathFactory pathFactory) {
                return new MergeTreeSplitGenerator(ChangelogWithKeyFileStoreTable.this.store.newKeyComparator(), ChangelogWithKeyFileStoreTable.this.store.options().splitTargetSize(), ChangelogWithKeyFileStoreTable.this.store.options().splitOpenFileCost());
            }

            @Override
            protected void withNonPartitionFilter(Predicate predicate) {
                List<Predicate> keyFilters = PredicateBuilder.pickTransformFieldMapping(PredicateBuilder.splitAnd(predicate), ChangelogWithKeyFileStoreTable.this.tableSchema.fieldNames(), ChangelogWithKeyFileStoreTable.this.tableSchema.trimmedPrimaryKeys());
                if (keyFilters.size() > 0) {
                    scan.withKeyFilter(PredicateBuilder.and(keyFilters));
                }
            }
        };
    }

    @Override
    public TableRead newRead() {
        List<String> primaryKeys = this.tableSchema.trimmedPrimaryKeys();
        final Set nonPrimaryKeys = this.tableSchema.fieldNames().stream().filter(name -> !primaryKeys.contains(name)).collect(Collectors.toSet());
        return new KeyValueTableRead(this.store.newRead()){

            @Override
            public TableRead withFilter(Predicate predicate) {
                ArrayList<Predicate> predicates = new ArrayList<Predicate>();
                for (Predicate sub : PredicateBuilder.splitAnd(predicate)) {
                    if (PredicateBuilder.containsFields(sub, nonPrimaryKeys)) continue;
                    predicates.add(sub);
                }
                if (predicates.size() > 0) {
                    this.read.withFilter(PredicateBuilder.and(predicates));
                }
                return this;
            }

            @Override
            public TableRead withProjection(int[][] projection) {
                this.read.withValueProjection(projection);
                return this;
            }

            @Override
            protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
                return new ValueContentRowDataRecordIterator(kvRecordIterator);
            }
        };
    }

    @Override
    public TableWrite newWrite() {
        SinkRecordConverter recordConverter = new SinkRecordConverter(this.store.options().bucket(), this.tableSchema);
        final SequenceGenerator sequenceGenerator = this.store.options().sequenceField().map(field -> new SequenceGenerator((String)field, this.schema().logicalRowType())).orElse(null);
        return new MemoryTableWrite<KeyValue>((FileStoreWrite)this.store.newWrite(), recordConverter, this.store.options()){
            private final KeyValue kv;
            {
                super(write, recordConverter, options);
                this.kv = new KeyValue();
            }

            @Override
            protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer) throws Exception {
                long sequenceNumber = sequenceGenerator == null ? -1L : sequenceGenerator.generate(record.row());
                writer.write(this.kv.replace((RowData)record.primaryKey(), sequenceNumber, record.row().getRowKind(), record.row()));
            }
        };
    }

    public KeyValueFileStore store() {
        return this.store;
    }
}

