/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.agents.runtime.eventlog;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.EventContext;
import org.apache.flink.agents.api.EventFilter;
import org.apache.flink.agents.api.logger.EventLogger;
import org.apache.flink.agents.api.logger.EventLoggerConfig;
import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
import org.apache.flink.agents.runtime.eventlog.EventLogRecord;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

public class FileEventLogger
implements EventLogger {
    public static final String BASE_LOG_DIR_PROPERTY_KEY = "baseLogDir";
    private static final String DEFAULT_BASE_LOG_DIR = Paths.get(System.getProperty("java.io.tmpdir"), "flink-agents").toString();
    private static final ObjectMapper MAPPER = new ObjectMapper();
    private final EventLoggerConfig config;
    private final EventFilter eventFilter;
    private PrintWriter writer;

    public FileEventLogger(EventLoggerConfig config) {
        this.config = config;
        this.eventFilter = config.getEventFilter();
    }

    @Override
    public void open(EventLoggerOpenParams params) throws Exception {
        String logFilePath = this.generateSubTaskLogFilePath(params);
        Path logPath = Paths.get(logFilePath, new String[0]).getParent();
        if (!Files.exists(logPath, new LinkOption[0])) {
            Files.createDirectories(logPath, new FileAttribute[0]);
        }
        this.writer = new PrintWriter(new BufferedWriter(new FileWriter(logFilePath, true)));
    }

    private String generateSubTaskLogFilePath(EventLoggerOpenParams params) {
        String baseLogDir = (String)this.config.getProperties().getOrDefault(BASE_LOG_DIR_PROPERTY_KEY, DEFAULT_BASE_LOG_DIR);
        String jobId = params.getRuntimeContext().getJobInfo().getJobId().toString();
        String taskName = params.getRuntimeContext().getTaskInfo().getTaskName().replaceAll("[\\\\/:*?\"<>|]", "_");
        int subTaskId = params.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
        String fileName = String.format("events-%s-%s-%d.log", jobId, taskName, subTaskId);
        return Paths.get(baseLogDir, fileName).toString();
    }

    @Override
    public void append(EventContext context, Event event) throws Exception {
        if (this.writer == null) {
            throw new IllegalStateException("FileEventLogger not initialized. Call open() first.");
        }
        if (!this.eventFilter.accept(event, context)) {
            return;
        }
        EventLogRecord record = new EventLogRecord(context, event);
        this.writer.println(MAPPER.writeValueAsString(record));
    }

    @Override
    public void flush() throws Exception {
        if (this.writer == null) {
            throw new IllegalStateException("FileEventLogger not initialized. Call open() first.");
        }
        this.writer.flush();
    }

    @Override
    public void close() throws Exception {
        if (this.writer != null) {
            this.flush();
            this.writer.close();
        }
    }
}

