/*
 * Decompiled with CFR 0.152.
 */
package pt.unl.fct.di.novasys.channel.tcp;

import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.BidiMap;
import org.apache.commons.collections4.bidimap.DualHashBidiMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.fct.di.novasys.channel.ChannelListener;
import pt.unl.fct.di.novasys.channel.IChannel;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.AttributeValidator;
import pt.unl.fct.di.novasys.network.Connection;
import pt.unl.fct.di.novasys.network.ISerializer;
import pt.unl.fct.di.novasys.network.NetworkManager;
import pt.unl.fct.di.novasys.network.data.Attributes;
import pt.unl.fct.di.novasys.network.data.Host;
import pt.unl.fct.di.novasys.network.listeners.InConnListener;
import pt.unl.fct.di.novasys.network.listeners.MessageListener;
import pt.unl.fct.di.novasys.network.listeners.OutConnListener;

public class MultithreadedTCPChannel<T>
implements IChannel<T>,
MessageListener<T>,
InConnListener<T>,
OutConnListener<T>,
AttributeValidator {
    private static final Logger logger = LogManager.getLogger(MultithreadedTCPChannel.class);
    private static final short TCP_MAGIC_NUMBER = 17669;
    public static final String NAME = "MultithreadedTCPChannel";
    public static final String ADDRESS_KEY = "address";
    public static final String PORT_KEY = "port";
    public static final String WORKER_GROUP_KEY = "workerGroup";
    public static final String LISTEN_ADDRESS_ATTRIBUTE = "listen_address";
    public static final int DEFAULT_PORT = 8573;
    public static final int CONNECTION_OUT = 0;
    public static final int CONNECTION_IN = 1;
    private final NetworkManager<T> network;
    private final ChannelListener<T> listener;
    private Attributes attributes;
    private Map<Host, Pair<Connection<T>, Queue<T>>> pendingOut;
    private Map<Host, Connection<T>> establishedOut;
    private final BidiMap<Host, Connection<T>> establishedIn;

    public MultithreadedTCPChannel(ISerializer<T> serializer, ChannelListener<T> list, Properties properties) throws IOException {
        this.listener = list;
        if (!properties.containsKey(ADDRESS_KEY)) {
            throw new IllegalArgumentException("MultithreadedTCPChannel requires binding address");
        }
        InetAddress addr = Inet4Address.getByName(properties.getProperty(ADDRESS_KEY));
        int port = properties.containsKey(PORT_KEY) ? (Integer)properties.get(PORT_KEY) : 8573;
        Host listenAddress = new Host(addr, port);
        EventLoopGroup eventExecutors = properties.containsKey(WORKER_GROUP_KEY) ? (EventLoopGroup)properties.get(WORKER_GROUP_KEY) : NetworkManager.createNewWorkerGroup(0);
        this.network = new NetworkManager<T>(serializer, this, 1000, 3000, 1000, eventExecutors);
        this.network.createServerSocket(this, new Host(addr, port), (AttributeValidator)this, eventExecutors);
        this.attributes = new Attributes();
        this.attributes.putShort("magic_number", (short)17669);
        this.attributes.putHost(LISTEN_ADDRESS_ATTRIBUTE, listenAddress);
        this.pendingOut = new ConcurrentHashMap<Host, Pair<Connection<T>, Queue<T>>>();
        this.establishedOut = new ConcurrentHashMap<Host, Connection<T>>();
        this.establishedIn = new DualHashBidiMap();
    }

    @Override
    public void openConnection(Host peer) {
        if (this.establishedOut.containsKey(peer)) {
            return;
        }
        this.pendingOut.computeIfAbsent(peer, k -> Pair.of(this.network.createConnection(peer, this.attributes, this), new LinkedList()));
    }

    @Override
    public void sendMessage(T msg, Host peer, int connection) {
        logger.debug("SendMessage " + msg + " " + peer + " " + (connection == 1 ? "IN" : "OUT"));
        if (connection <= 0) {
            Connection<T> established = this.establishedOut.get(peer);
            if (established != null) {
                this.sendWithListener(msg, peer, established);
            } else {
                Pair<Connection<T>, Queue<T>> pending = this.pendingOut.get(peer);
                if (pending != null) {
                    ((Queue)pending.getValue()).add(msg);
                } else {
                    this.listener.messageFailed(msg, peer, new IllegalArgumentException("No outgoing connection to peer."));
                }
            }
        } else if (connection == 1) {
            Connection inConn = (Connection)this.establishedIn.get((Object)peer);
            if (inConn != null) {
                this.sendWithListener(msg, peer, inConn);
            } else {
                this.listener.messageFailed(msg, peer, new IllegalArgumentException("No incoming connection"));
            }
        } else {
            this.listener.messageFailed(msg, peer, new IllegalArgumentException("Invalid send connection: " + connection));
            logger.error("Invalid sendMessage connection " + connection);
        }
    }

    private void sendWithListener(T msg, Host peer, Connection<T> conn) {
        EventLoop loop = conn.getLoop();
        loop.submit(() -> {
            Promise promise = loop.newPromise();
            promise.addListener(future -> {
                if (future.isSuccess()) {
                    this.listener.messageSent(msg, peer);
                } else {
                    this.listener.messageFailed(msg, peer, future.cause());
                }
            });
            conn.sendMessage(msg, (Promise<Void>)promise);
        });
    }

    @Override
    public void closeConnection(Host peer, int connection) {
        Connection<T> established;
        logger.debug("CloseConnection " + peer);
        Pair<Connection<T>, Queue<T>> remove = this.pendingOut.remove(peer);
        if (remove != null) {
            ((Connection)remove.getKey()).disconnect();
        }
        if ((established = this.establishedOut.get(peer)) != null) {
            established.disconnect();
        }
    }

    @Override
    public void outboundConnectionUp(Connection<T> conn) {
        assert (conn.getLoop().inEventLoop());
        logger.debug("OutboundConnectionUp " + conn.getPeer());
        Pair<Connection<T>, Queue<T>> remove = this.pendingOut.remove(conn.getPeer());
        if (remove != null) {
            if (remove.getKey() != conn) {
                throw new RuntimeException("Reference mismatch");
            }
            Connection<T> put = this.establishedOut.put(conn.getPeer(), conn);
            if (put != null) {
                throw new RuntimeException("Connection already exists in connection up");
            }
            this.listener.deliverEvent(new OutConnectionUp(conn.getPeer()));
            ((Queue)remove.getValue()).forEach(t -> this.sendWithListener(t, conn.getPeer(), conn));
        } else {
            logger.warn("ConnectionUp with no pending: " + conn);
        }
    }

    @Override
    public void outboundConnectionDown(Connection<T> conn, Throwable cause) {
        assert (conn.getLoop().inEventLoop());
        logger.debug("OutboundConnectionDown " + conn.getPeer() + (cause != null ? " " + cause : ""));
        Connection<T> remove = this.establishedOut.remove(conn.getPeer());
        if (remove != null) {
            this.listener.deliverEvent(new OutConnectionDown(conn.getPeer(), cause));
        } else {
            logger.warn("ConnectionDown with no context available: " + conn);
        }
    }

    @Override
    public void outboundConnectionFailed(Connection<T> conn, Throwable cause) {
        assert (conn.getLoop().inEventLoop());
        logger.debug("OutboundConnectionFailed " + conn.getPeer() + (cause != null ? " " + cause : ""));
        if (this.establishedOut.containsKey(conn.getPeer())) {
            throw new RuntimeException("Connection exists in conn failed");
        }
        Pair<Connection<T>, Queue<T>> remove = this.pendingOut.remove(conn.getPeer());
        if (remove != null) {
            this.listener.deliverEvent(new OutConnectionFailed(conn.getPeer(), (Queue)remove.getRight(), cause));
        } else {
            logger.warn("ConnectionFailed with no pending: " + conn);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void inboundConnectionUp(Connection<T> con) {
        Connection old;
        Host clientSocket;
        assert (con.getLoop().inEventLoop());
        try {
            clientSocket = con.getPeerAttributes().getHost(LISTEN_ADDRESS_ATTRIBUTE);
        }
        catch (IOException e) {
            logger.error("Error parsing LISTEN_ADDRESS_ATTRIBUTE of inbound connection: " + e.getMessage());
            con.disconnect();
            return;
        }
        if (clientSocket == null) {
            logger.error("Inbound connection without LISTEN_ADDRESS: " + con.getPeer() + " " + con.getPeerAttributes());
            return;
        }
        logger.debug("InboundConnectionUp " + clientSocket);
        BidiMap<Host, Connection<T>> bidiMap = this.establishedIn;
        synchronized (bidiMap) {
            old = (Connection)this.establishedIn.putIfAbsent((Object)clientSocket, con);
        }
        if (old != null) {
            throw new RuntimeException("Double incoming connection from: " + clientSocket + " (" + con.getPeer() + ")");
        }
        this.listener.deliverEvent(new InConnectionUp(clientSocket));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void inboundConnectionDown(Connection<T> conn, Throwable cause) {
        Host host;
        assert (conn.getLoop().inEventLoop());
        BidiMap<Host, Connection<T>> bidiMap = this.establishedIn;
        synchronized (bidiMap) {
            host = (Host)this.establishedIn.removeValue(conn);
        }
        logger.debug("InboundConnectionDown " + host + (cause != null ? " " + cause : ""));
        this.listener.deliverEvent(new InConnectionDown(host, cause));
    }

    @Override
    public void deliverMessage(T msg, Connection<T> conn) {
        Host host;
        assert (conn.getLoop().inEventLoop());
        if (conn.isInbound()) {
            host = (Host)this.establishedIn.getKey(conn);
            if (host == null) {
                throw new AssertionError((Object)"Null host");
            }
        } else {
            host = conn.getPeer();
        }
        logger.debug("DeliverMessage " + msg + " " + host + " " + (conn.isInbound() ? "IN" : "OUT"));
        this.listener.deliverMessage(msg, host);
    }

    @Override
    public void serverSocketBind(boolean success, Throwable cause) {
        if (success) {
            logger.debug("Server socket ready");
        } else {
            logger.error("Server socket bind failed: " + cause);
        }
    }

    @Override
    public void serverSocketClose(boolean success, Throwable cause) {
        logger.debug("Server socket closed. " + (success ? "" : "Cause: " + cause));
    }

    @Override
    public boolean validateAttributes(Attributes attr) {
        Short channel = attr.getShort("magic_number");
        return channel != null && channel == 17669;
    }
}

