/*
 * Decompiled with CFR 0.152.
 */
package com.equestricraft.redis;

import com.equestricraft.common.util.ThreadUtils;
import com.equestricraft.logging.Log;
import com.equestricraft.redis.ObjectSerializationCodec;
import com.equestricraft.redis.RequestObject;
import com.equestricraft.redis.RequestPublishMessage;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import java.io.Serializable;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

public class RequestHandler<T extends Serializable, R extends Serializable> {
    private static RedisClient redisClient = null;
    private static StatefulRedisPubSubConnection<String, RequestPublishMessage> requestHandlerNotificationChannel = null;
    private static final String CHANNEL_REQUESTS = "requester-requests";
    private static final String CHANNEL_RESPONSES = "requester-responses";
    private static final int TIMEOUT_SECONDS = 5;
    private static final Executor handlerExecutor = Executors.newFixedThreadPool(10);
    private final String channel;
    private Function<T, R> handlerFunction = null;
    private final Map<UUID, CompletableFuture<R>> futures = new HashMap<UUID, CompletableFuture<R>>();
    private final ReentrantLock futuresLock = new ReentrantLock();
    private RedisAsyncCommands<String, RequestObject<T>> requestCommands = null;
    private RedisAsyncCommands<String, R> responseCommands = null;
    private int failedConnectionAttempts = 0;
    private static final Log log = Log.getLogger(RequestHandler.class.getName());

    protected static void setUp(RedisClient redisClient) {
        RequestHandler.redisClient = redisClient;
        requestHandlerNotificationChannel = redisClient.connectPubSub(new ObjectSerializationCodec());
        requestHandlerNotificationChannel.setTimeout(Duration.ofSeconds(5L));
        requestHandlerNotificationChannel.async().subscribe(CHANNEL_REQUESTS, CHANNEL_RESPONSES);
    }

    protected RequestHandler(String channel) {
        this.channel = RequestHandler.prefix(channel);
        this.reconnect();
    }

    private static String prefix(String requestChannel) {
        return String.format("requester-%s", requestChannel);
    }

    private void reconnect() {
        try {
            this.connect();
        }
        catch (Exception ex) {
            ++this.failedConnectionAttempts;
            if (this.failedConnectionAttempts > 10) {
                log.error("Redis connection {} has failed {} times in a row", (Object)this.channel, (Object)this.failedConnectionAttempts);
            } else {
                log.info("Connection failed for channel {}, will retry", (Object)this.channel);
            }
            new Timer().schedule(new TimerTask(){

                @Override
                public void run() {
                    RequestHandler.this.reconnect();
                }
            }, Duration.ofSeconds(5L).toMillis());
        }
    }

    private void connect() {
        StatefulRedisConnection requestDataChannel = redisClient.connect(new ObjectSerializationCodec());
        requestDataChannel.setTimeout(Duration.ofSeconds(5L));
        this.requestCommands = requestDataChannel.async();
        StatefulRedisConnection responseDataChannel = redisClient.connect(new ObjectSerializationCodec());
        responseDataChannel.setTimeout(Duration.ofSeconds(5L));
        this.responseCommands = responseDataChannel.async();
        requestHandlerNotificationChannel.addListener((RedisPubSubListener<String, RequestPublishMessage>)new RedisPubSubAdapter<String, RequestPublishMessage>(){

            @Override
            public void message(String channel, RequestPublishMessage message) {
                if (RequestHandler.this.handlerFunction == null) {
                    return;
                }
                if (channel.equals(RequestHandler.CHANNEL_REQUESTS)) {
                    handlerExecutor.execute(() -> RequestHandler.this.handleRequest(message));
                }
            }
        });
        requestHandlerNotificationChannel.addListener((RedisPubSubListener<String, RequestPublishMessage>)new RedisPubSubAdapter<String, RequestPublishMessage>(){

            @Override
            public void message(String channel, RequestPublishMessage message) {
                if (RequestHandler.this.handlerFunction != null) {
                    return;
                }
                if (channel.equals(RequestHandler.CHANNEL_RESPONSES)) {
                    handlerExecutor.execute(() -> RequestHandler.this.handleResponse(message));
                }
            }
        });
        log.info("Channel {} has connected", (Object)this.channel);
    }

    private void handleRequest(RequestPublishMessage message) {
        if (!this.channel.equals(message.getChannelName())) {
            return;
        }
        RequestObject<T> request = this.getRequest(message.getRequestId());
        Serializable r = (Serializable)this.handlerFunction.apply(request.t());
        this.responseCommands.hset(this.channel, request.requestUuid().toString(), r);
        ThreadUtils.sleep(Duration.ofMillis(40L));
        RequestPublishMessage publishMessage = new RequestPublishMessage(this.channel, request.requestUuid());
        requestHandlerNotificationChannel.async().publish(CHANNEL_RESPONSES, publishMessage);
    }

    private RequestObject<T> getRequest(UUID requestId) {
        RedisFuture request = this.requestCommands.hget(this.channel, requestId.toString());
        try {
            if (request.await(5L, TimeUnit.SECONDS)) {
                this.requestCommands.hdel(this.channel, (String[])new String[]{requestId.toString()});
                return (RequestObject)request.get();
            }
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
        throw new RedisException(String.format("Error getting request on %s channel", this.channel));
    }

    private void handleResponse(RequestPublishMessage message) {
        if (!this.channel.equals(message.getChannelName())) {
            return;
        }
        this.futuresLock.lock();
        try {
            if (this.futures.containsKey(message.getRequestId())) {
                R data = this.getResponse(message.getRequestId());
                CompletableFuture<R> future = this.futures.remove(message.getRequestId());
                future.complete(data);
                return;
            }
            throw new RedisException(String.format("Error getting future handler on %s channel", this.channel));
        }
        finally {
            this.futuresLock.unlock();
        }
    }

    private R getResponse(UUID requestId) {
        RedisFuture response = this.responseCommands.hget(this.channel, requestId.toString());
        try {
            if (response.await(5L, TimeUnit.SECONDS)) {
                this.responseCommands.hdel(this.channel, (String[])new String[]{requestId.toString()});
                return (R)((Serializable)response.get());
            }
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
        throw new RedisException("Error getting data on list");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final R get(T data) {
        UUID uuid = UUID.randomUUID();
        RequestObject<T> requestObject = new RequestObject<T>(uuid, data);
        CompletableFuture future = new CompletableFuture();
        this.futuresLock.lock();
        try {
            this.futures.put(uuid, future);
        }
        finally {
            this.futuresLock.unlock();
        }
        this.requestCommands.hset(this.channel, uuid.toString(), requestObject);
        ThreadUtils.sleep(Duration.ofMillis(40L));
        RequestPublishMessage publishMessage = new RequestPublishMessage(this.channel, uuid);
        requestHandlerNotificationChannel.async().publish(CHANNEL_REQUESTS, publishMessage);
        try {
            return (R)((Serializable)future.get(5L, TimeUnit.SECONDS));
        }
        catch (ExecutionException | TimeoutException ex) {
            throw new RedisException("Timed out sending data to the server");
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            return null;
        }
    }

    public final void handle(Function<T, R> handler) {
        this.handlerFunction = handler;
    }
}

