/*
 * Decompiled with CFR 0.152.
 */
package org.openzen.packetstreams.qpsp;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.openzen.packetstreams.qpsp.PacketScheduler;
import org.openzen.packetstreams.qpsp.QPSPEndpoint;
import org.openzen.packetstreams.qpsp.QPSPPacket;
import org.openzen.packetstreams.qpsp.QPSPSession;
import org.openzen.packetstreams.qpsp.StreamMultiplexer;
import org.openzen.packetstreams.qpsp.scheduler.CongestionController;

public class StandardPacketScheduler
implements PacketScheduler {
    private final QPSPSession session;
    private final StreamMultiplexer multiplexer;
    private final List<InFlightPacket> inFlight = new ArrayList<InFlightPacket>();
    private final Queue<QPSPPacket> retransmit = new PriorityQueue<QPSPPacket>();
    private final Timer timer = new Timer();
    private final QPSPEndpoint target;
    private final BlockingQueue<Runnable> commandQueue = new LinkedBlockingQueue<Runnable>();
    private final CongestionController congestionController;
    private int latestRTT = 1000;
    private int srtt = 0;
    private boolean closed = false;
    private boolean paused = false;
    private long lastSentPacketTimestamp = System.currentTimeMillis();
    private int packetsLostSinceLastReceived = 0;

    public StandardPacketScheduler(QPSPSession session, StreamMultiplexer multiplexer, QPSPEndpoint target, CongestionController congestionController) {
        this.session = session;
        this.multiplexer = multiplexer;
        this.target = target;
        this.congestionController = congestionController;
        congestionController.setPacketScheduler(this);
        new Thread(() -> {
            while (!this.closed) {
                try {
                    Runnable command = this.commandQueue.take();
                    if (command == null) continue;
                    command.run();
                }
                catch (InterruptedException interruptedException) {}
            }
        }).start();
    }

    @Override
    public void pause() {
        this.commandQueue.offer(() -> {
            this.paused = true;
        });
    }

    @Override
    public void resume() {
        this.commandQueue.offer(() -> {
            this.paused = false;
            this.packetsLostSinceLastReceived = 0;
            this.doResume();
        });
    }

    @Override
    public void resumeStreams() {
        this.commandQueue.offer(() -> this.doResume());
    }

    @Override
    public void onPacketReceived() {
        this.commandQueue.offer(() -> {
            this.packetsLostSinceLastReceived = 0;
        });
    }

    @Override
    public void onAcknowledged(long streamID, long seq) {
        this.commandQueue.offer(() -> this.acknowledge(streamID, seq));
    }

    @Override
    public void onStreamClosed(long streamID) {
    }

    @Override
    public void onSessionClosed() {
        this.commandQueue.offer(() -> {
            this.timer.cancel();
            this.closed = true;
        });
    }

    @Override
    public int getEstimatedRTTInMillis() {
        return (this.srtt + this.latestRTT) / 2;
    }

    @Override
    public long getLastSentPacketTimestamp() {
        return this.lastSentPacketTimestamp;
    }

    private void doResume() {
        if (this.paused) {
            return;
        }
        while (this.sendPacket()) {
        }
    }

    private boolean sendPacket() {
        boolean isRetransmission;
        QPSPPacket next;
        if (this.congestionController.isCongested()) {
            return false;
        }
        if (this.packetsLostSinceLastReceived > 20) {
            this.session.logger.log(8, -1L, "Stopped session transmission due to no response");
            return false;
        }
        if (!this.retransmit.isEmpty() && this.retransmit.peek().priority >= this.multiplexer.peekPriority()) {
            next = this.retransmit.poll();
            isRetransmission = true;
        } else {
            next = this.multiplexer.next();
            isRetransmission = false;
        }
        if (next == null) {
            return !this.retransmit.isEmpty();
        }
        if (!next.isLossy()) {
            InFlightPacket packet = new InFlightPacket(next, isRetransmission);
            this.inFlight.add(packet);
            int timeout = this.getTransmissionTimeout();
            this.timer.schedule((TimerTask)packet, timeout);
        }
        String active = this.multiplexer.getActive();
        if (!isRetransmission) {
            this.session.logger.log(8, next.localStreamID, "Transmitting #" + next.seq + ", " + this.congestionController.getStateInfo() + ", active=" + active);
        } else {
            this.session.logger.log(8, next.localStreamID, "Retransmitting #" + next.seq + ", " + this.congestionController.getStateInfo());
        }
        this.congestionController.onSent(next);
        try {
            this.lastSentPacketTimestamp = System.currentTimeMillis();
            this.target.send(this.session, next.data);
            return true;
        }
        catch (IOException ex) {
            ex.printStackTrace();
            return false;
        }
    }

    private void acknowledge(long streamID, long seq) {
        this.packetsLostSinceLastReceived = 0;
        InFlightPacket toRemove = this.findInFlight(streamID, seq);
        if (toRemove != null) {
            int rtt;
            toRemove.cancel();
            this.inFlight.remove(toRemove);
            this.latestRTT = rtt = (int)(System.currentTimeMillis() - ((InFlightPacket)toRemove).packet.timestamp);
            this.srtt = this.srtt == 0 ? rtt : (int)((float)this.srtt * 0.9f + (float)rtt * 0.1f);
            this.onAckLossless(toRemove.packet);
            if (toRemove.isRetransmission) {
                this.session.logger.log(8, streamID, "Ack " + seq + ", retransmitted packet, " + this.congestionController.getStateInfo());
            } else {
                this.session.logger.log(8, streamID, "Ack " + seq + ", rtt = " + this.latestRTT + ", " + this.congestionController.getStateInfo());
            }
        } else {
            this.congestionController.onAckDuplicate(streamID, seq);
        }
        QPSPPacket toRemoveRetransmit = this.findRetransmit(streamID, seq);
        this.retransmit.remove(toRemoveRetransmit);
        this.doResume();
    }

    private void onAckLossless(QPSPPacket packet) {
        this.congestionController.onAckLossless(packet);
        this.doResume();
    }

    private void onPacketLost(QPSPPacket packet) {
        ++this.packetsLostSinceLastReceived;
        this.congestionController.onPacketLost(packet);
        this.doResume();
    }

    private InFlightPacket findInFlight(long streamID, long seq) {
        for (InFlightPacket packet : this.inFlight) {
            if (((InFlightPacket)packet).packet.localStreamID != streamID || ((InFlightPacket)packet).packet.seq != seq) continue;
            return packet;
        }
        return null;
    }

    private QPSPPacket findRetransmit(long streamID, long seq) {
        for (QPSPPacket packet : this.retransmit) {
            if (packet.localStreamID != streamID || packet.seq != seq) continue;
            return packet;
        }
        return null;
    }

    private int getTransmissionTimeout() {
        return (int)((float)Math.max(this.srtt, this.latestRTT) * 1.5f);
    }

    private class InFlightPacket
    extends TimerTask {
        private final QPSPPacket packet;
        private final boolean isRetransmission;

        public InFlightPacket(QPSPPacket packet, boolean isRetransmission) {
            this.packet = packet;
            this.isRetransmission = isRetransmission;
        }

        @Override
        public void run() {
            StandardPacketScheduler.this.commandQueue.offer(() -> {
                ((StandardPacketScheduler)StandardPacketScheduler.this).session.logger.log(8, this.packet.localStreamID, "Lost packet " + this.packet.seq);
                StandardPacketScheduler.this.retransmit.add(this.packet);
                StandardPacketScheduler.this.inFlight.remove(this);
                StandardPacketScheduler.this.onPacketLost(this.packet);
            });
        }
    }
}

