/*
 * Decompiled with CFR 0.152.
 */
package com.talpie.linker;

import com.talpie.linker.ClientHandler;
import com.talpie.linker.Config;
import com.talpie.linker.DataSocketServer;
import com.talpie.linker.Message;
import com.talpie.linker.ServerListenersHandler;
import com.talpie.linker.Service;
import com.talpie.linker.StatiCom;
import com.talpie.linker.StreamFrame;
import com.talpie.linker.StreamSocketServer;
import com.talpie.linker.SystemInfo;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ServerService
implements Service {
    private final Config config;
    private SystemInfo systemInfo = new SystemInfo();
    private final ExecutorService serverExecutor = Executors.newSingleThreadExecutor();
    private final ConcurrentHashMap<String, ClientHandler> activeClients = new ConcurrentHashMap();
    private final ServerListenersHandler listenersHandlers = new ServerListenersHandler();
    private volatile boolean running = false;
    private ServerSocket serverSocket;
    private final ConcurrentHashMap<String, String> pendingStreams = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, StreamSocketServer> activeStreams = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, CompletableFuture<StreamSocketServer>> streamAwaiters = new ConcurrentHashMap();
    private static final ScheduledExecutorService SCHED = Executors.newSingleThreadScheduledExecutor();

    void removeActiveStream(String streamId, StreamSocketServer who) {
        this.activeStreams.compute(streamId, (k, v) -> v == who ? null : v);
    }

    public StreamSocketServer getActiveStream(String streamId) {
        return this.activeStreams.get(streamId);
    }

    public Config getConfig() {
        return this.config;
    }

    public boolean isRunning() {
        return this.running;
    }

    public SystemInfo getSystemInfo() {
        return this.systemInfo;
    }

    public ServerListenersHandler getListenersHandlers() {
        return this.listenersHandlers;
    }

    public ServerService(int port) {
        this.config = new Config(port);
    }

    @Override
    public void start() {
        try {
            this.serverSocket = new ServerSocket(this.config.getPort());
            this.running = true;
            this.serverExecutor.submit(this::socketAcceptLoop);
            this.listenersHandlers.start(this);
        }
        catch (Exception e) {
            this.getListenersHandlers().error(this, e);
        }
    }

    @Override
    public void stop() {
        this.running = false;
        try {
            this.serverExecutor.shutdownNow();
            this.activeClients.forEach((clientId, handler) -> handler.stop("Server stopped."));
            this.serverSocket.close();
            this.listenersHandlers.stop(this);
        }
        catch (Exception e) {
            this.getListenersHandlers().error(this, e);
        }
    }

    @Override
    public Service.Type getType() {
        return Service.Type.SERVER;
    }

    public void removeClient(String clientId, String cause) {
        this.listenersHandlers.clientDisconnected(this, this.activeClients.remove(clientId), new Exception(cause));
    }

    private static <T> CompletableFuture<T> failedFuture(Throwable ex) {
        CompletableFuture f = new CompletableFuture();
        f.completeExceptionally(ex);
        return f;
    }

    public CompletableFuture<Message> sendControlRequestTo(String clientId, String route, byte[] payload, long timeoutMillis) {
        ClientHandler ch = this.activeClients.get(clientId);
        if (ch == null) {
            return ServerService.failedFuture(new IllegalArgumentException("Unknown clientId: " + clientId));
        }
        return ch.sendControlRequest(route, payload, timeoutMillis);
    }

    public CompletableFuture<Message> sendControlRequestTo(String clientId, String route, byte[] payload) {
        return this.sendControlRequestTo(clientId, route, payload, 0L);
    }

    public CompletableFuture<Message> sendRequestTo(String clientId, String route, byte[] data, long timeoutMillis) {
        ClientHandler ch = this.activeClients.get(clientId);
        if (ch == null) {
            return ServerService.failedFuture(new IllegalArgumentException("Unknown clientId: " + clientId));
        }
        return ch.sendRequest(route, data, timeoutMillis);
    }

    public CompletableFuture<Message> sendRequestTo(String clientId, String route, byte[] data) {
        return this.sendRequestTo(clientId, route, data, 0L);
    }

    public Map<String, CompletableFuture<Message>> broadcastControlRequest(String route, byte[] payload, long timeoutMillis) {
        ConcurrentHashMap<String, CompletableFuture<Message>> result = new ConcurrentHashMap<String, CompletableFuture<Message>>();
        this.activeClients.forEach((id, ch) -> {
            try {
                result.put((String)id, ch.sendControlRequest(route, payload, timeoutMillis));
            }
            catch (Exception e) {
                result.put((String)id, ServerService.failedFuture(e));
            }
        });
        return result;
    }

    public Map<String, CompletableFuture<Message>> broadcastControlRequest(String route, byte[] payload) {
        return this.broadcastControlRequest(route, payload, 0L);
    }

    public Map<String, CompletableFuture<Message>> broadcastRequest(String route, byte[] data, long timeoutMillis) {
        ConcurrentHashMap<String, CompletableFuture<Message>> result = new ConcurrentHashMap<String, CompletableFuture<Message>>();
        this.activeClients.forEach((id, ch) -> {
            try {
                result.put((String)id, ch.sendRequest(route, data, timeoutMillis));
            }
            catch (Exception e) {
                result.put((String)id, ServerService.failedFuture(e));
            }
        });
        return result;
    }

    public Map<String, CompletableFuture<Message>> broadcastRequest(String route, byte[] data) {
        return this.broadcastRequest(route, data, 0L);
    }

    private void socketAcceptLoop() {
        while (this.running) {
            try {
                String clientId;
                Socket clientSocket = this.serverSocket.accept();
                clientSocket.setTcpNoDelay(true);
                clientSocket.setKeepAlive(true);
                clientSocket.setReceiveBufferSize(0x100000);
                clientSocket.setSendBufferSize(0x100000);
                String initLine = StatiCom.readLineString(clientSocket.getInputStream());
                if (initLine.equals("#_START-MAIN-SOCKET++")) {
                    if (this.activeClients.size() >= this.config.getMaxActiveClients()) {
                        StatiCom.writeLine(clientSocket.getOutputStream(), "ERROR: Max amount of clients is already connected to server (" + this.config.getMaxActiveClients() + "). Please wait for free slot and try again.");
                        clientSocket.close();
                        continue;
                    }
                    clientId = UUID.randomUUID().toString();
                    ClientHandler clientHandler = new ClientHandler(this, clientSocket, clientId);
                    this.activeClients.put(clientId, clientHandler);
                    clientHandler.start();
                    this.listenersHandlers.clientConnected(this, clientHandler);
                    continue;
                }
                if (initLine.startsWith("#_START-DATA-SOCKET++")) {
                    clientId = initLine.substring(21, 57);
                    String socketId = initLine.substring(57, 93);
                    String signature = initLine.substring(93);
                    ClientHandler clientHandler = this.activeClients.get(clientId);
                    if (clientHandler == null) {
                        StatiCom.writeLine(clientSocket.getOutputStream(), "ERROR: Unknown clientId");
                        clientSocket.close();
                        continue;
                    }
                    if (!clientHandler.getPendingDataSockets().contains(socketId)) {
                        StatiCom.writeLine(clientSocket.getOutputStream(), "ERROR: Entered incorrect socket ID: " + socketId);
                        clientSocket.close();
                        continue;
                    }
                    if (!clientHandler.getRsa().verify(clientId + socketId, signature)) {
                        StatiCom.writeLine(clientSocket.getOutputStream(), "ERROR: Signature verification has failed: " + signature);
                        clientSocket.close();
                        continue;
                    }
                    boolean samePeer = clientSocket.getInetAddress().equals(clientHandler.getSocket().getInetAddress());
                    boolean sameLocal = clientSocket.getLocalAddress().equals(clientHandler.getSocket().getLocalAddress());
                    if (!samePeer || !sameLocal) {
                        String msg = String.format("ERROR: Data socket host mismatch. peer=%s(local=%s) vs control peer=%s(local=%s)", clientSocket.getInetAddress().getHostAddress(), clientSocket.getLocalAddress().getHostAddress(), clientHandler.getSocket().getInetAddress().getHostAddress(), clientHandler.getSocket().getLocalAddress().getHostAddress());
                        StatiCom.writeLine(clientSocket.getOutputStream(), msg);
                        clientSocket.close();
                        this.getListenersHandlers().error(this, new Exception(msg));
                        continue;
                    }
                    DataSocketServer dataSocket = new DataSocketServer(clientHandler, clientSocket, socketId);
                    clientHandler.getPendingDataSockets().remove(socketId);
                    clientHandler.addDataSocket(socketId, dataSocket);
                    dataSocket.start();
                    this.listenersHandlers.dataOpen(this, clientHandler, dataSocket);
                    continue;
                }
                if (initLine.startsWith("#_START-STREAM-SOCKET++")) {
                    String PREFIX = "#_START-STREAM-SOCKET++";
                    int P = "#_START-STREAM-SOCKET++".length();
                    int UUID_LEN = 36;
                    if (initLine.length() < P + 72) {
                        StatiCom.writeLine(clientSocket.getOutputStream(), "ERROR: Bad init line length");
                        clientSocket.close();
                        continue;
                    }
                    String clientId2 = initLine.substring(P, P + 36);
                    String streamId = initLine.substring(P + 36, P + 72);
                    String signature = initLine.substring(P + 72);
                    ClientHandler ch = this.activeClients.get(clientId2);
                    if (ch == null) {
                        StatiCom.writeLine(clientSocket.getOutputStream(), "ERROR: Unknown clientId");
                        clientSocket.close();
                        continue;
                    }
                    String expected = this.pendingStreams.get(streamId);
                    if (expected == null || !expected.equals(clientId2)) {
                        StatiCom.writeLine(clientSocket.getOutputStream(), "ERROR: Stream not authorized: " + streamId);
                        clientSocket.close();
                        continue;
                    }
                    if (!ch.getRsa().verify(clientId2 + streamId, signature)) {
                        StatiCom.writeLine(clientSocket.getOutputStream(), "ERROR: Signature verification failed");
                        clientSocket.close();
                        continue;
                    }
                    boolean samePeer = clientSocket.getInetAddress().equals(ch.getSocket().getInetAddress());
                    boolean sameLocal = clientSocket.getLocalAddress().equals(ch.getSocket().getLocalAddress());
                    if (!samePeer || !sameLocal) {
                        String msg = String.format("ERROR: Stream socket host mismatch peer=%s(local=%s) vs control peer=%s(local=%s)", clientSocket.getInetAddress().getHostAddress(), clientSocket.getLocalAddress().getHostAddress(), ch.getSocket().getInetAddress().getHostAddress(), ch.getSocket().getLocalAddress().getHostAddress());
                        StatiCom.writeLine(clientSocket.getOutputStream(), msg);
                        clientSocket.close();
                        this.getListenersHandlers().error(this, new Exception(msg));
                        continue;
                    }
                    StreamSocketServer ss = new StreamSocketServer(ch, clientSocket, streamId, 64);
                    this.pendingStreams.remove(streamId);
                    this.activeStreams.put(streamId, ss);
                    ss.start();
                    CompletableFuture<StreamSocketServer> w = this.streamAwaiters.remove(streamId);
                    if (w == null || w.isDone()) continue;
                    w.complete(ss);
                    continue;
                }
                String error = "ERROR: Unknown initiation message: " + initLine;
                StatiCom.writeLine(clientSocket.getOutputStream(), error);
                clientSocket.close();
                this.listenersHandlers.error(this, new Exception(error));
            }
            catch (Exception e) {
                this.getListenersHandlers().error(this, e);
            }
        }
    }

    public CompletableFuture<StreamSocketServer> openStreamFor(String clientId, String streamId, int queueSize, long timeoutMs) {
        this.pendingStreams.put(streamId, clientId);
        CompletableFuture<Message> ctrl = this.sendControlRequestTo(clientId, "#_SYS++STREAM/OPEN-REQ", streamId.getBytes(StandardCharsets.UTF_8), timeoutMs);
        CompletableFuture waiter = new CompletableFuture();
        this.streamAwaiters.put(streamId, waiter);
        if (timeoutMs > 0L) {
            SCHED.schedule(() -> {
                CompletableFuture<StreamSocketServer> w = this.streamAwaiters.remove(streamId);
                if (w != null && !w.isDone()) {
                    w.completeExceptionally(new TimeoutException("Stream open timeout: " + streamId));
                }
                this.pendingStreams.remove(streamId);
            }, timeoutMs, TimeUnit.MILLISECONDS);
        }
        return ctrl.thenCompose(x -> waiter);
    }

    public boolean sendThroughStream(String streamId, StreamFrame frame) {
        StreamSocketServer ss = this.activeStreams.get(streamId);
        return ss != null && ss.offerFrame(frame);
    }

    public CompletableFuture<Void> closeStreamFor(String clientId, String streamId, long timeoutMs) {
        StreamSocketServer ss = this.activeStreams.remove(streamId);
        if (ss != null) {
            ss.stop();
        }
        return this.sendControlRequestTo(clientId, "#_SYS++STREAM/CLOSE", streamId.getBytes(StandardCharsets.UTF_8), timeoutMs).thenAccept(x -> {});
    }

    void authorizeStream(String clientId, String streamId) {
        this.pendingStreams.put(streamId, clientId);
    }
}

