/*
 * Decompiled with CFR 0.152.
 */
package pt.unl.fct.di.novasys.channel.accrual;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.fct.di.novasys.channel.ChannelListener;
import pt.unl.fct.di.novasys.channel.accrual.InConnectionState;
import pt.unl.fct.di.novasys.channel.accrual.OutConnectionState;
import pt.unl.fct.di.novasys.channel.accrual.PhiAccrual;
import pt.unl.fct.di.novasys.channel.accrual.events.PhiEvent;
import pt.unl.fct.di.novasys.channel.accrual.messaging.AccrualAppMessage;
import pt.unl.fct.di.novasys.channel.accrual.messaging.AccrualHbMessage;
import pt.unl.fct.di.novasys.channel.accrual.messaging.AccrualMessage;
import pt.unl.fct.di.novasys.channel.accrual.messaging.AccrualMessageSerializer;
import pt.unl.fct.di.novasys.channel.base.SingleThreadedBiChannel;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.AttributeValidator;
import pt.unl.fct.di.novasys.network.Connection;
import pt.unl.fct.di.novasys.network.ISerializer;
import pt.unl.fct.di.novasys.network.NetworkManager;
import pt.unl.fct.di.novasys.network.data.Attributes;
import pt.unl.fct.di.novasys.network.data.Host;

public class AccrualChannel<T>
extends SingleThreadedBiChannel<T, AccrualMessage<T>>
implements AttributeValidator {
    public static final String NAME = "AccrualChannel";
    public static final String ADDRESS_KEY = "address";
    public static final String PORT_KEY = "port";
    public static final String WORKER_GROUP_KEY = "worker_group";
    public static final String TRIGGER_SENT_KEY = "trigger_sent";
    public static final String CONNECT_TIMEOUT_KEY = "connect_timeout";
    public static final String LISTEN_ADDRESS_ATTRIBUTE = "listen_address";
    public static final String DEFAULT_PORT = "8551";
    public static final String DEFAULT_CONNECT_TIMEOUT = "1000";
    public static final int CONNECTION_OUT = 0;
    public static final int CONNECTION_IN = 1;
    public static final String WINDOW_SIZE_KEY = "window_size";
    public static final String HB_INTERVAL_KEY = "hb_interval";
    public static final String HB_INTERVAL_ATTRIBUTE = "hb_interval";
    public static final String MIN_STD_DEVIATION_KEY = "std_deviation";
    public static final String ACCEPTABLE_HB_PAUSE_KEY = "acceptable_hb_pause";
    public static final String THRESHOLD_KEY = "threshold";
    public static final String PREDICT_INTERVAL_KEY = "predict_interval";
    public static final String DEFAULT_WINDOW_SIZE = "1000";
    public static final String DEFAULT_HB_INTERVAL = "1000";
    public static final String DEFAULT_MIN_STD_DEVIATION = "200";
    public static final String DEFAULT_ACCEPTABLE_HB_PAUSE = "1000";
    public static final String DEFAULT_THRESHOLD = "-1";
    public static final String DEFAULT_PREDICT_INTERVAL = "100";
    private static final short ACCRUAL_MAGIC_NUMBER = 18022;
    private static final Logger logger = LogManager.getLogger(AccrualChannel.class);
    private final Map<Host, LinkedList<InConnectionState<AccrualMessage<T>>>> inConnections;
    private final Map<Host, OutConnectionState<AccrualMessage<T>>> outConnections;
    private final boolean triggerSent;
    private final NetworkManager<AccrualMessage<T>> network;
    private final ChannelListener<T> listener;
    private final Attributes attributes;
    private final int windowSize;
    private final int hbInterval;
    private final int minStdDeviation;
    private final int acceptableHbPause;
    private final double threshold;
    private final int predictInterval;
    private final Map<Host, PhiAccrual> monitors;

    public AccrualChannel(ISerializer<T> serializer, ChannelListener<T> list, Properties properties) throws IOException {
        super(NAME);
        this.listener = list;
        if (!properties.containsKey(ADDRESS_KEY)) {
            throw new IllegalArgumentException("AccrualChannel requires binding address");
        }
        InetAddress addr = Inet4Address.getByName(properties.getProperty(ADDRESS_KEY));
        int port = Integer.parseInt(properties.getProperty(PORT_KEY, DEFAULT_PORT));
        int connTimeout = Integer.parseInt(properties.getProperty(CONNECT_TIMEOUT_KEY, "1000"));
        this.triggerSent = Boolean.parseBoolean(properties.getProperty(TRIGGER_SENT_KEY, "false"));
        this.windowSize = Integer.parseInt(properties.getProperty(WINDOW_SIZE_KEY, "1000"));
        this.hbInterval = Integer.parseInt(properties.getProperty("hb_interval", "1000"));
        this.minStdDeviation = Integer.parseInt(properties.getProperty(MIN_STD_DEVIATION_KEY, DEFAULT_MIN_STD_DEVIATION));
        this.acceptableHbPause = Integer.parseInt(properties.getProperty(ACCEPTABLE_HB_PAUSE_KEY, "1000"));
        this.threshold = Double.parseDouble(properties.getProperty(THRESHOLD_KEY, DEFAULT_THRESHOLD));
        this.predictInterval = Integer.parseInt(properties.getProperty(PREDICT_INTERVAL_KEY, DEFAULT_PREDICT_INTERVAL));
        Host listenAddress = new Host(addr, port);
        EventLoopGroup eventExecutors = properties.containsKey(WORKER_GROUP_KEY) ? (EventLoopGroup)properties.get(WORKER_GROUP_KEY) : NetworkManager.createNewWorkerGroup();
        AccrualMessageSerializer<T> accSerializer = new AccrualMessageSerializer<T>(serializer);
        this.network = new NetworkManager<T>(accSerializer, this, 0, 0, connTimeout, eventExecutors);
        this.network.createServerSocket(this, listenAddress, (AttributeValidator)this, eventExecutors);
        this.attributes = new Attributes();
        this.attributes.putShort("magic_number", (short)18022);
        this.attributes.putHost(LISTEN_ADDRESS_ATTRIBUTE, listenAddress);
        this.attributes.putInt("hb_interval", this.hbInterval);
        this.inConnections = new HashMap<Host, LinkedList<InConnectionState<AccrualMessage<T>>>>();
        this.outConnections = new HashMap<Host, OutConnectionState<AccrualMessage<T>>>();
        this.monitors = new HashMap<Host, PhiAccrual>();
        this.loop.scheduleAtFixedRate(this::predictPhi, (long)this.predictInterval, (long)this.predictInterval, TimeUnit.MILLISECONDS);
    }

    private void predictPhi() {
        HashMap<Host, Map<String, Double>> values = new HashMap<Host, Map<String, Double>>();
        long timestamp = System.currentTimeMillis();
        this.monitors.forEach((k, v) -> {
            Map<String, Double> res = v.phi(timestamp);
            double val = res.get("phi");
            if (this.threshold <= 0.0 || val >= this.threshold) {
                values.put((Host)k, res);
            }
        });
        if (!values.isEmpty()) {
            this.listener.deliverEvent(new PhiEvent(values));
        }
    }

    @Override
    protected void onOpenConnection(Host peer) {
        OutConnectionState<AccrualMessage<T>> conState = this.outConnections.get(peer);
        if (conState == null) {
            logger.debug("onOpenConnection creating connection to: " + peer);
            this.outConnections.put(peer, new OutConnectionState<AccrualMessage<T>>(this.network.createConnection(peer, this.attributes, this)));
        } else if (conState.getState() == OutConnectionState.State.DISCONNECTING) {
            logger.debug("onOpenConnection reopening after close to: " + peer);
            conState.setState(OutConnectionState.State.DISCONNECTING_RECONNECT);
        } else {
            logger.debug("onOpenConnection ignored: " + peer);
        }
    }

    @Override
    protected void onSendMessage(T msg, Host peer, int connection) {
        logger.debug("SendMessage " + msg + " " + peer + " " + (connection == 1 ? "IN" : "OUT"));
        if (connection <= 0) {
            OutConnectionState<AccrualMessage<T>> conState = this.outConnections.get(peer);
            if (conState != null) {
                if (conState.getState() == OutConnectionState.State.CONNECTING || conState.getState() == OutConnectionState.State.DISCONNECTING_RECONNECT) {
                    conState.getQueue().add(new AccrualAppMessage<T>(msg));
                } else if (conState.getState() == OutConnectionState.State.CONNECTED) {
                    this.sendWithListener(new AccrualAppMessage<T>(msg), peer, conState.getConnection());
                } else if (conState.getState() == OutConnectionState.State.DISCONNECTING) {
                    conState.getQueue().add(new AccrualAppMessage<T>(msg));
                    conState.setState(OutConnectionState.State.DISCONNECTING_RECONNECT);
                }
            } else {
                this.listener.messageFailed(msg, peer, new IllegalArgumentException("No outgoing connection"));
            }
        } else if (connection == 1) {
            LinkedList<InConnectionState<AccrualMessage<T>>> inConnList = this.inConnections.get(peer);
            if (inConnList != null) {
                this.sendWithListener(new AccrualAppMessage<T>(msg), peer, inConnList.getLast().getConnection());
            } else {
                this.listener.messageFailed(msg, peer, new IllegalArgumentException("No incoming connection"));
            }
        } else {
            this.listener.messageFailed(msg, peer, new IllegalArgumentException("Invalid connection: " + connection));
            logger.error("Invalid sendMessage mode " + connection);
        }
    }

    private void sendWithListener(AccrualAppMessage<T> msg, Host peer, Connection<AccrualMessage<T>> established) {
        Promise promise = this.loop.newPromise();
        promise.addListener(future -> {
            if (future.isSuccess() && this.triggerSent) {
                this.listener.messageSent(msg.getPayload(), peer);
            } else if (!future.isSuccess()) {
                this.listener.messageFailed(msg.getPayload(), peer, future.cause());
            }
        });
        established.sendMessage(msg, (Promise<Void>)promise);
    }

    @Override
    protected void onOutboundConnectionUp(Connection<AccrualMessage<T>> conn) {
        logger.debug("OutboundConnectionUp " + conn.getPeer());
        OutConnectionState<AccrualMessage<T>> conState = this.outConnections.get(conn.getPeer());
        if (conState == null) {
            throw new AssertionError((Object)("ConnectionUp with no conState: " + conn));
        }
        if (conState.getState() == OutConnectionState.State.CONNECTED) {
            throw new AssertionError((Object)("ConnectionUp in CONNECTED state: " + conn));
        }
        if (conState.getState() == OutConnectionState.State.CONNECTING) {
            conState.setState(OutConnectionState.State.CONNECTED);
            this.monitors.put(conn.getPeer(), new PhiAccrual(this.windowSize, this.threshold, this.minStdDeviation, this.acceptableHbPause, this.hbInterval * 4));
            conState.getQueue().forEach(m -> this.sendWithListener((AccrualAppMessage)m, conn.getPeer(), conn));
            conState.getQueue().clear();
            this.listener.deliverEvent(new OutConnectionUp(conn.getPeer()));
        }
    }

    @Override
    protected void onCloseConnection(Host peer, int connection) {
        logger.debug("CloseConnection " + peer);
        OutConnectionState<AccrualMessage<T>> conState = this.outConnections.get(peer);
        if (conState != null && (conState.getState() == OutConnectionState.State.CONNECTED || conState.getState() == OutConnectionState.State.CONNECTING || conState.getState() == OutConnectionState.State.DISCONNECTING_RECONNECT)) {
            conState.setState(OutConnectionState.State.DISCONNECTING);
            conState.getQueue().clear();
            conState.getConnection().disconnect();
        }
    }

    @Override
    protected void onOutboundConnectionDown(Connection<AccrualMessage<T>> conn, Throwable cause) {
        logger.debug("OutboundConnectionDown " + conn.getPeer() + (cause != null ? " " + cause : ""));
        OutConnectionState<AccrualMessage<T>> conState = this.outConnections.remove(conn.getPeer());
        this.monitors.remove(conn.getPeer());
        if (conState == null) {
            throw new AssertionError((Object)("ConnectionDown with no conState: " + conn));
        }
        if (conState.getState() == OutConnectionState.State.CONNECTING) {
            throw new AssertionError((Object)("ConnectionDown in CONNECTING state: " + conn));
        }
        if (conState.getState() == OutConnectionState.State.CONNECTED) {
            this.listener.deliverEvent(new OutConnectionDown(conn.getPeer(), cause));
        } else if (conState.getState() == OutConnectionState.State.DISCONNECTING_RECONNECT) {
            this.outConnections.put(conn.getPeer(), new OutConnectionState<AccrualMessage<T>>(this.network.createConnection(conn.getPeer(), this.attributes, this), conState.getQueue()));
        }
    }

    @Override
    protected void onOutboundConnectionFailed(Connection<AccrualMessage<T>> conn, Throwable cause) {
        logger.debug("OutboundConnectionFailed " + conn.getPeer() + (cause != null ? " " + cause : ""));
        OutConnectionState<AccrualMessage<T>> conState = this.outConnections.remove(conn.getPeer());
        if (conState == null) {
            throw new AssertionError((Object)("ConnectionFailed with no conState: " + conn));
        }
        if (conState.getState() == OutConnectionState.State.CONNECTING) {
            this.listener.deliverEvent(new OutConnectionFailed<AccrualMessage<T>>(conn.getPeer(), conState.getQueue(), cause));
        } else if (conState.getState() == OutConnectionState.State.DISCONNECTING_RECONNECT) {
            this.outConnections.put(conn.getPeer(), new OutConnectionState<AccrualMessage<T>>(this.network.createConnection(conn.getPeer(), this.attributes, this), conState.getQueue()));
        } else if (conState.getState() == OutConnectionState.State.CONNECTED) {
            throw new AssertionError((Object)("ConnectionFailed in state: " + (Object)((Object)conState.getState()) + " - " + conn));
        }
    }

    private void sendHeartbeat(InConnectionState<AccrualMessage<T>> conState) {
        conState.getConnection().sendMessage(new AccrualHbMessage(conState.getAndIncCounter()));
    }

    @Override
    protected void onInboundConnectionUp(Connection<AccrualMessage<T>> con) {
        Host clientSocket;
        try {
            clientSocket = con.getPeerAttributes().getHost(LISTEN_ADDRESS_ATTRIBUTE);
        }
        catch (IOException e) {
            logger.error("Inbound connection without valid listen address in connectionUp: " + e.getMessage());
            con.disconnect();
            return;
        }
        LinkedList inConnList = this.inConnections.computeIfAbsent(clientSocket, k -> new LinkedList());
        InConnectionState<AccrualMessage<T>> inConnState = new InConnectionState<AccrualMessage<T>>(con);
        inConnList.add(inConnState);
        int remoteHbInterval = con.getPeerAttributes().getInt("hb_interval");
        ScheduledFuture scheduledFuture = con.getLoop().scheduleAtFixedRate(() -> this.sendHeartbeat(inConnState), (long)remoteHbInterval, (long)remoteHbInterval, TimeUnit.MILLISECONDS);
        inConnState.setPeriodicHbTask(scheduledFuture);
        if (inConnList.size() == 1) {
            logger.debug("InboundConnectionUp " + clientSocket);
            this.listener.deliverEvent(new InConnectionUp(clientSocket));
        } else {
            logger.debug("Multiple InboundConnectionUp " + inConnList.size() + clientSocket);
        }
    }

    @Override
    protected void onInboundConnectionDown(Connection<AccrualMessage<T>> con, Throwable cause) {
        Host clientSocket;
        try {
            clientSocket = con.getPeerAttributes().getHost(LISTEN_ADDRESS_ATTRIBUTE);
        }
        catch (IOException e) {
            logger.error("Inbound connection without valid listen address in connectionDown: " + e.getMessage());
            con.disconnect();
            return;
        }
        LinkedList<InConnectionState<AccrualMessage<T>>> inConnList = this.inConnections.get(clientSocket);
        if (inConnList == null || inConnList.isEmpty()) {
            throw new AssertionError((Object)("No connections in InboundConnectionDown " + clientSocket));
        }
        Optional<InConnectionState> first = inConnList.stream().filter(conState -> conState.getConnection() == con).findFirst();
        if (!first.isPresent()) {
            throw new AssertionError((Object)("No connection in InboundConnectionDown " + clientSocket));
        }
        inConnList.remove(first.get());
        first.get().getPeriodicHbTask().cancel(true);
        if (inConnList.isEmpty()) {
            logger.debug("InboundConnectionDown " + clientSocket + (cause != null ? " " + cause : ""));
            this.listener.deliverEvent(new InConnectionDown(clientSocket, cause));
            this.inConnections.remove(clientSocket);
        } else {
            logger.debug("Extra InboundConnectionDown " + inConnList.size() + clientSocket);
        }
    }

    @Override
    public void onServerSocketBind(boolean success, Throwable cause) {
        if (success) {
            logger.debug("Server socket ready");
        } else {
            logger.error("Server socket bind failed: " + cause);
        }
    }

    @Override
    public void onServerSocketClose(boolean success, Throwable cause) {
        logger.debug("Server socket closed. " + (success ? "" : "Cause: " + cause));
    }

    private void handleHbMessage(AccrualHbMessage<T> msg, Connection<AccrualMessage<T>> conn) {
        this.monitors.get(conn.getPeer()).receivedHb(msg.getCounter());
    }

    private void handleAppMessage(AccrualAppMessage<T> msg, Connection<AccrualMessage<T>> conn) {
        Host host;
        if (conn.isInbound()) {
            try {
                host = conn.getPeerAttributes().getHost(LISTEN_ADDRESS_ATTRIBUTE);
            }
            catch (IOException e) {
                logger.error("Inbound connection without valid listen address in deliver message: " + e.getMessage());
                conn.disconnect();
                return;
            }
        } else {
            host = conn.getPeer();
        }
        logger.debug("DeliverMessage " + msg + " " + host + " " + (conn.isInbound() ? "IN" : "OUT"));
        this.listener.deliverMessage(msg.getPayload(), host);
    }

    @Override
    public void onDeliverMessage(AccrualMessage<T> msg, Connection<AccrualMessage<T>> conn) {
        switch (msg.getType()) {
            case APP_MSG: {
                this.handleAppMessage((AccrualAppMessage)msg, conn);
                break;
            }
            case HB: {
                this.handleHbMessage((AccrualHbMessage)msg, conn);
            }
        }
    }

    @Override
    public boolean validateAttributes(Attributes attr) {
        Short channel = attr.getShort("magic_number");
        return channel != null && channel == 18022;
    }
}

