package io.transwarp.hive.service;

import io.transwarp.hadoop.hive.conf.HiveConf;
import io.transwarp.hadoop.hive.ql.metadata.schedule.PartitionManagementTask;
import io.transwarp.hadoop.hive.ql.metadata.schedule.ScheduleTaskThread;
import io.transwarp.hadoop.hive.ql.util.ZooKeeperHiveHelper;
import io.transwarp.hive.service.utils.JavaUtils;
import io.transwarp.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.transwarp.thirdparty.org.apache.commons.lang.time.DateUtils;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.BackgroundPathAndBytesable;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;

/* loaded from: input_file:io/transwarp/hive/service/SchedulerService.class */
public class SchedulerService extends AbstractService implements Runnable {
    private static final Log LOG = LogFactory.getLog(SchedulerService.class.getName());
    private String metaId;
    private HiveConf hiveConf;
    private Thread serviceThread;
    private CuratorFramework zkClient;
    private ScheduleInterProcessMutex scheduleLock;
    private boolean zkStarted;
    private boolean acquire;
    private final String serviceRootPath = "/SchedulerService";
    private byte[] lockBytes;
    private String zooKeeperEnsemble;
    private ScheduledExecutorService pool;
    private final AtomicBoolean stop;
    private final AtomicBoolean inScheduling;
    private final String[] taskNames;
    private final ACLProvider zooKeeperAclProvider;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/transwarp/hive/service/SchedulerService$ScheduleInterProcessMutex.class */
    public static class ScheduleInterProcessMutex extends InterProcessMutex {
        byte[] lockNodeBytes;

        public ScheduleInterProcessMutex(CuratorFramework curatorFramework, String str, byte[] bArr) {
            super(curatorFramework, str);
            this.lockNodeBytes = bArr;
        }

        protected byte[] getLockNodeBytes() {
            return this.lockNodeBytes;
        }

        protected String getLockPath() {
            return super.getLockPath();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/transwarp/hive/service/SchedulerService$ZKConnectionStateListener.class */
    public class ZKConnectionStateListener implements ConnectionStateListener {
        ZKConnectionStateListener() {
        }

        public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd-MMM-yyyy HH:mm:ss:SSS");
            if (curatorFramework == SchedulerService.this.zkClient && (connectionState == ConnectionState.LOST || connectionState == ConnectionState.RECONNECTED)) {
                SchedulerService.LOG.error("Zookeeper client lost connection, will reset zk status and stop the scheduler tasks");
                SchedulerService.this.resetStatus();
            }
            SchedulerService.LOG.info(simpleDateFormat.format(new Date()) + " : zkClient " + curatorFramework + " state " + connectionState.toString());
        }
    }

    public SchedulerService(String str) {
        super(SchedulerService.class.getSimpleName());
        this.zkStarted = false;
        this.acquire = false;
        this.serviceRootPath = "/SchedulerService";
        this.taskNames = new String[]{PartitionManagementTask.class.getName()};
        this.zooKeeperAclProvider = new ACLProvider() { // from class: io.transwarp.hive.service.SchedulerService.1
            final List<ACL> nodeAcls = new ArrayList();

            public List<ACL> getDefaultAcl() {
                this.nodeAcls.addAll(ZooDefs.Ids.OPEN_ACL_UNSAFE);
                return this.nodeAcls;
            }

            public List<ACL> getAclForPath(String str2) {
                return getDefaultAcl();
            }
        };
        this.metaId = str;
        this.inScheduling = new AtomicBoolean();
        this.stop = new AtomicBoolean();
    }

    @Override // io.transwarp.hive.service.AbstractService, io.transwarp.hive.service.Service
    public synchronized void init(HiveConf hiveConf) {
        super.init(hiveConf);
        this.hiveConf = hiveConf;
    }

    @Override // io.transwarp.hive.service.AbstractService, io.transwarp.hive.service.Service
    public synchronized void start() {
        super.start();
        this.serviceThread = new Thread(this);
        this.serviceThread.setDaemon(true);
        this.serviceThread.setName(SchedulerService.class.getSimpleName());
        this.serviceThread.start();
    }

    @Override // java.lang.Runnable
    public void run() {
        boolean z;
        initZookeeper();
        try {
            LOG.info("wait for async scheduler ready");
        } catch (Exception e) {
            LOG.error("Unknown exception happened", e);
        }
        do {
            if (this.inScheduling.get()) {
                try {
                    Thread.sleep(10000L);
                } catch (InterruptedException e2) {
                    LOG.debug("SchedulerService interrupt");
                }
            } else {
                try {
                    connectZK();
                    acquireLock();
                    z = true;
                } catch (Exception e3) {
                    LOG.error("Caught an exception while connecting zk in the main loop of Scheduler Service, " + e3.getMessage());
                    resetStatus();
                    z = false;
                }
                if (z) {
                    initThreadPool();
                    for (String str : this.taskNames) {
                        Runnable runnable = (ScheduleTaskThread) JavaUtils.newInstance(JavaUtils.getClass(str, ScheduleTaskThread.class));
                        runnable.setConf(this.hiveConf);
                        long runFrequency = runnable.runFrequency(TimeUnit.MILLISECONDS);
                        this.pool.scheduleAtFixedRate(runnable, runFrequency, runFrequency, TimeUnit.MILLISECONDS);
                        LOG.info("Scheduling for " + runnable.getClass().getCanonicalName() + " service.");
                    }
                    this.inScheduling.set(true);
                }
            }
            LOG.error("Unknown exception happened", e);
            stop();
        } while (!this.stop.get());
        stop();
    }

    @Override // io.transwarp.hive.service.AbstractService, io.transwarp.hive.service.Service
    public synchronized void stop() {
        this.stop.set(true);
        if (this.zkStarted) {
            if (this.acquire) {
                try {
                    this.scheduleLock.release();
                } catch (Exception e) {
                }
            }
            if (this.zkClient != null) {
                this.zkClient.close();
            }
        }
        shutDownThreadPool();
        super.stop();
    }

    private void initZookeeper() {
        String zkQuorum = ZooKeeperHiveHelper.getZkQuorum(this.hiveConf);
        if (zkQuorum == null || zkQuorum.length() == 0) {
            throw new RuntimeException("Failed to get zookeeper quorum");
        }
        this.zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(this.hiveConf, zkQuorum);
        if (this.zooKeeperEnsemble.length() == 0) {
            throw new RuntimeException("Failed to get zookeeper quorum servers");
        }
        this.lockBytes = new byte[0];
    }

    private void connectZK() throws Exception {
        LOG.info("Connect to zookeeper");
        int timeVar = (int) HiveConf.getTimeVar(this.hiveConf, HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
        ZKConnectionStateListener zKConnectionStateListener = new ZKConnectionStateListener();
        this.zkClient = CuratorFrameworkFactory.builder().connectString(this.zooKeeperEnsemble).aclProvider(this.zooKeeperAclProvider).retryPolicy(new ExponentialBackoffRetry(DateUtils.MILLIS_IN_SECOND, 3)).sessionTimeoutMs(timeVar).connectionTimeoutMs(timeVar).build();
        this.zkClient.getConnectionStateListenable().addListener(zKConnectionStateListener);
        this.zkClient.start();
        this.zkStarted = true;
        LOG.info("Zookeeper start!");
        createZKNodeIfNeeded("/SchedulerService", true, new byte[0]);
        StringBuilder sb = new StringBuilder("/SchedulerService");
        createZKNodeIfNeeded(sb.append("/").append(this.metaId).toString(), true, new byte[0]);
        this.scheduleLock = new ScheduleInterProcessMutex(this.zkClient, sb.append("/").append("lock").toString(), this.lockBytes);
    }

    private void acquireLock() throws Exception {
        LOG.info("Try to acquire schedule lock");
        do {
        } while (!this.scheduleLock.acquire(3000L, TimeUnit.SECONDS));
        this.acquire = true;
        LOG.info("Got schedule lock");
        this.zkClient.setData().forPath(this.scheduleLock.getLockPath(), (new String(this.lockBytes, StandardCharsets.UTF_8) + "-working").getBytes(StandardCharsets.UTF_8));
    }

    private void createZKNodeIfNeeded(String str, boolean z, byte[] bArr) throws Exception {
        if (this.zkClient.checkExists().forPath(str) == null) {
            LOG.info("create async schedule node " + str + " in zookeeper");
            ((BackgroundPathAndBytesable) ((ACLBackgroundPathAndBytesable) this.zkClient.create().withMode(z ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL)).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)).forPath(str, bArr);
        }
    }

    private void initThreadPool() {
        if (this.pool == null) {
            this.pool = Executors.newScheduledThreadPool(this.taskNames.length, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduler Service Worker %d").build());
        }
    }

    private void shutDownThreadPool() {
        if (this.pool != null) {
            this.pool.shutdown();
            try {
                this.pool.awaitTermination(10L, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
            } finally {
                this.pool = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resetStatus() {
        LOG.info("Lost zk connection, try to reset schedule thread, check whether hive.zookeeper.session.timeout set too small");
        try {
            this.scheduleLock.release();
        } catch (Throwable th) {
            LOG.warn("Fail to release zk lock");
        }
        try {
            this.zkClient.close();
        } catch (Throwable th2) {
            LOG.warn("Fail to close zk client");
        }
        this.scheduleLock = null;
        this.zkClient = null;
        this.zkStarted = false;
        this.acquire = false;
        this.inScheduling.set(false);
        shutDownThreadPool();
    }
}
