package io.transwarp.hive.service;

import io.transwarp.hadoop.hive.conf.HiveConf;
import io.transwarp.hadoop.hive.metastore.api.CompactionTableType;
import io.transwarp.hadoop.hive.ql.txn.compactor.CompactDriver;
import io.transwarp.hadoop.hive.ql.txn.compactor.CompactMSDelegate;
import io.transwarp.hadoop.hive.ql.txn.compactor.CompactThread;
import io.transwarp.hadoop.hive.ql.txn.compactor.CompactionException;
import io.transwarp.hadoop.hive.ql.util.ZooKeeperHiveHelper;
import io.transwarp.hadoop.util.StringUtils;
import io.transwarp.thirdparty.org.apache.commons.lang.time.DateUtils;
import java.io.IOException;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.BackgroundPathAndBytesable;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;

/* loaded from: input_file:io/transwarp/hive/service/AbstractCompactService.class */
public abstract class AbstractCompactService extends AbstractService implements Runnable {
    private CuratorFramework zkClient;
    private CompactorInterProcessMutex compactLock;
    private long checkInterval;
    private AtomicBoolean stop;
    private boolean isCompactProcess;
    private boolean zkStarted;
    private boolean acquire;
    private PathChildrenCache pathChildrenCache;
    private ExecutorService executors;
    protected boolean addLockRevokeListener;
    protected String lockPath;
    protected CompactThread initiatorThread;
    protected CompactThread cleanerThread;
    protected List<CompactThread> workerThreads;
    protected CompactMSDelegate compactMSDelegate;
    protected long readOnlySleepTime;
    protected String metaId;
    protected boolean needAcquireLoks;
    protected String compactNodeName;
    protected String compactLockName;
    protected byte[] lockBytes;
    protected static final String SERVER_COMPACT_SERVICE = "server-compactor";
    protected ThreadLocal<Map<CompactionTableType, CompactDriver>> driverMap;
    private final ACLProvider zooKeeperAclProvider;
    public static final Log LOG = LogFactory.getLog(AbstractCompactService.class.getName());
    protected static int nextThreadId = 1000000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/transwarp/hive/service/AbstractCompactService$CompactorInterProcessMutex.class */
    public class CompactorInterProcessMutex extends InterProcessMutex {
        byte[] lockNodeBytes;

        public CompactorInterProcessMutex(CuratorFramework curatorFramework, String str, byte[] bArr) {
            super(curatorFramework, str);
            this.lockNodeBytes = bArr;
        }

        protected byte[] getLockNodeBytes() {
            return this.lockNodeBytes;
        }

        protected String getLockPath() {
            return super.getLockPath();
        }
    }

    /* loaded from: input_file:io/transwarp/hive/service/AbstractCompactService$ZKConnectionStateListener.class */
    class ZKConnectionStateListener implements ConnectionStateListener {
        ZKConnectionStateListener() {
        }

        public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd-MMM-yyyy HH:mm:ss:SSS");
            if (curatorFramework == AbstractCompactService.this.zkClient && (connectionState == ConnectionState.LOST || connectionState == ConnectionState.RECONNECTED)) {
                AbstractCompactService.LOG.error("Zookeeper client lost connection, will reset zk status and stop the compact threads");
                AbstractCompactService.this.resetStatus();
            }
            AbstractCompactService.LOG.info(simpleDateFormat.format(new Date()) + " : zkClient " + curatorFramework + " state " + connectionState.toString());
        }
    }

    public AbstractCompactService(String str) {
        super(str);
        this.checkInterval = 0L;
        this.isCompactProcess = false;
        this.zkStarted = false;
        this.acquire = false;
        this.addLockRevokeListener = false;
        this.initiatorThread = null;
        this.cleanerThread = null;
        this.workerThreads = null;
        this.compactMSDelegate = null;
        this.driverMap = new ThreadLocal() { // from class: io.transwarp.hive.service.AbstractCompactService.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // java.lang.ThreadLocal
            public Map<CompactionTableType, CompactDriver> initialValue() {
                try {
                    return AbstractCompactService.this.getDriverMap();
                } catch (CompactionException e) {
                    AbstractCompactService.LOG.warn(e.getMessage());
                    return null;
                }
            }
        };
        this.zooKeeperAclProvider = new ACLProvider() { // from class: io.transwarp.hive.service.AbstractCompactService.4
            List<ACL> nodeAcls = new ArrayList();

            public List<ACL> getDefaultAcl() {
                this.nodeAcls.addAll(ZooDefs.Ids.OPEN_ACL_UNSAFE);
                return this.nodeAcls;
            }

            public List<ACL> getAclForPath(String str2) {
                return getDefaultAcl();
            }
        };
    }

    protected abstract void initCompactionMSDelegate() throws CompactionException;

    protected abstract CompactMSDelegate getCompactMSDelegate() throws CompactionException;

    protected abstract Map<CompactionTableType, CompactDriver> getDriverMap() throws CompactionException;

    protected abstract byte[] generateLockNodeBytes();

    protected abstract void initializeParams() throws Exception;

    protected abstract void startCompactorInitiator() throws Exception;

    protected abstract void startCompactorCleaner() throws Exception;

    protected abstract void startCompactorWorkers() throws Exception;

    protected abstract CompactThread startCompactorWorker() throws Exception;

    @Override // io.transwarp.hive.service.AbstractService, io.transwarp.hive.service.Service
    public synchronized void init(HiveConf hiveConf) {
        super.init(hiveConf);
        this.checkInterval = getHiveConf().getTimeVar(HiveConf.ConfVars.ORC_COMPACT_SERVICE_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
        this.readOnlySleepTime = HiveConf.getTimeVar(getHiveConf(), HiveConf.ConfVars.HIVE_COMPACTOR_SLEEP_TIME_FOR_READ_ONLY_METASTORE, TimeUnit.MILLISECONDS);
        this.stop = new AtomicBoolean();
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            initCompactionMSDelegate();
            initializeParams();
            String zkQuorum = ZooKeeperHiveHelper.getZkQuorum(getHiveConf());
            this.lockBytes = generateLockNodeBytes();
            String quorumServers = (!this.needAcquireLoks || zkQuorum == null || zkQuorum.length() == 0) ? null : ZooKeeperHiveHelper.getQuorumServers(getHiveConf(), zkQuorum);
            if (this.metaId == null || this.metaId.length() == 0) {
                throw new RuntimeException("Failed to get metaId");
            }
            do {
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    try {
                        if (this.isCompactProcess) {
                            checkInitiator();
                            checkWorker();
                            checkCleaner();
                        } else if (quorumServers == null || quorumServers.length() == 0) {
                            startCompactThreads();
                            this.isCompactProcess = true;
                            this.checkInterval = getHiveConf().getTimeVar(HiveConf.ConfVars.ORC_COMPACT_THREAD_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
                        } else {
                            if (this.zkClient == null) {
                                int timeVar = (int) HiveConf.getTimeVar(getHiveConf(), HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
                                ZKConnectionStateListener zKConnectionStateListener = new ZKConnectionStateListener();
                                this.zkClient = CuratorFrameworkFactory.builder().connectString(quorumServers).aclProvider(this.zooKeeperAclProvider).retryPolicy(new ExponentialBackoffRetry(DateUtils.MILLIS_IN_SECOND, 3)).sessionTimeoutMs(timeVar).connectionTimeoutMs(timeVar).build();
                                this.zkClient.getConnectionStateListenable().addListener(zKConnectionStateListener);
                                this.zkClient.start();
                                this.zkStarted = true;
                            }
                            if (this.compactLock == null) {
                                createZKNodeIfNeeded(this.compactNodeName, true, new byte[0]);
                                StringBuilder sb = new StringBuilder(this.compactNodeName);
                                createZKNodeIfNeeded(sb.append("/").append(this.metaId).toString(), true, new byte[0]);
                                this.lockPath = sb.append("/").append(this.compactLockName).toString();
                                this.compactLock = new CompactorInterProcessMutex(this.zkClient, this.lockPath, this.lockBytes);
                            }
                            this.compactLock.acquire();
                            this.acquire = true;
                            if (this.acquire) {
                                this.zkClient.setData().forPath(this.compactLock.getLockPath(), (new String(this.lockBytes, Charset.forName("UTF-8")) + "-working").getBytes(Charset.forName("UTF-8")));
                                startCompactThreads();
                                this.isCompactProcess = true;
                                this.checkInterval = getHiveConf().getTimeVar(HiveConf.ConfVars.ORC_COMPACT_THREAD_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
                                if (this.addLockRevokeListener) {
                                    this.pathChildrenCache = new PathChildrenCache(this.zkClient, this.lockPath, true);
                                    this.pathChildrenCache.start();
                                    this.executors = Executors.newSingleThreadExecutor(new ThreadFactory() { // from class: io.transwarp.hive.service.AbstractCompactService.2
                                        @Override // java.util.concurrent.ThreadFactory
                                        public Thread newThread(Runnable runnable) {
                                            return new Thread(runnable, "CsPathListenerExecutor-" + runnable.hashCode());
                                        }
                                    });
                                    this.pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { // from class: io.transwarp.hive.service.AbstractCompactService.3
                                        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                                            ChildData childData = null;
                                            if (pathChildrenCacheEvent != null) {
                                                childData = pathChildrenCacheEvent.getData();
                                            }
                                            if (childData == null) {
                                                AbstractCompactService.LOG.warn("ChildData is null");
                                                return;
                                            }
                                            byte[] data = childData.getData();
                                            if (data == null) {
                                                AbstractCompactService.LOG.warn("ChildData is empty, nothing to get");
                                                return;
                                            }
                                            String str = new String(data);
                                            if (pathChildrenCacheEvent.getType() != PathChildrenCacheEvent.Type.CHILD_ADDED || !str.contains(AbstractCompactService.SERVER_COMPACT_SERVICE)) {
                                                AbstractCompactService.LOG.info(str + " " + pathChildrenCacheEvent.getType().name() + " event detected of metaId:" + AbstractCompactService.this.metaId + ",do noting");
                                            } else {
                                                AbstractCompactService.LOG.info(str + " " + pathChildrenCacheEvent.getType().name() + " event detected,release lock of msc under metaId path" + AbstractCompactService.this.metaId + " and close current compact service!");
                                                AbstractCompactService.this.stop.set(true);
                                            }
                                        }
                                    }, this.executors);
                                }
                            }
                        }
                        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                        if (currentTimeMillis2 < this.checkInterval && !this.stop.get()) {
                            Thread.sleep(this.checkInterval - currentTimeMillis2);
                        }
                    } catch (Throwable th) {
                        LOG.error("Caught an exception in the main loop of compact service checker, " + StringUtils.stringifyException(th));
                        try {
                            Thread.sleep(this.checkInterval);
                        } catch (Throwable th2) {
                        }
                    }
                } finally {
                    if (this.addLockRevokeListener) {
                        if (this.pathChildrenCache != null) {
                            try {
                                this.pathChildrenCache.close();
                            } catch (IOException e) {
                            }
                        }
                        if (this.executors != null) {
                            this.executors.shutdown();
                            try {
                                this.executors.awaitTermination(10L, TimeUnit.MINUTES);
                            } catch (InterruptedException e2) {
                            }
                        }
                    }
                    if (this.zkStarted) {
                        if (this.acquire) {
                            try {
                                this.compactLock.release();
                            } catch (Exception e3) {
                            }
                        }
                        if (this.zkClient != null) {
                            this.zkClient.close();
                        }
                    }
                    stop();
                }
            } while (!this.stop.get());
        } catch (Exception e4) {
            LOG.error("Failed to init thread env for compact service: " + e4.getMessage());
            throw new RuntimeException(e4);
        }
    }

    @Override // io.transwarp.hive.service.AbstractService, io.transwarp.hive.service.Service
    public synchronized void stop() {
        stopCompactThreads();
        this.stop.set(true);
        super.stop();
    }

    protected void startCompactThreads() throws Exception {
        startCompactorInitiator();
        startCompactorWorkers();
        startCompactorCleaner();
    }

    private void checkInitiator() throws Exception {
        if (this.initiatorThread == null || this.initiatorThread.isAlive()) {
            return;
        }
        this.initiatorThread.getStop().set(true);
        startCompactorInitiator();
    }

    private void checkWorker() throws Exception {
        if (this.workerThreads != null) {
            for (int i = 0; i < this.workerThreads.size(); i++) {
                if (!this.workerThreads.get(i).isAlive()) {
                    this.workerThreads.set(i, startCompactorWorker());
                }
            }
        }
    }

    private void checkCleaner() throws Exception {
        if (this.cleanerThread == null || this.cleanerThread.isAlive()) {
            return;
        }
        this.cleanerThread.getStop().set(true);
        startCompactorCleaner();
    }

    private void createZKNodeIfNeeded(String str, boolean z, byte[] bArr) throws Exception {
        if (this.zkClient.checkExists().forPath(str) == null) {
            LOG.info("create orc compact root node in zookeeper");
            ((BackgroundPathAndBytesable) ((ACLBackgroundPathAndBytesable) this.zkClient.create().withMode(z ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL)).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)).forPath(str, bArr);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resetStatus() {
        LOG.info("Lost zk connection, try to reset compact service, check whether hive.zookeeper.session.timeout set too small");
        try {
            try {
                this.compactLock.release();
            } catch (Exception e) {
                LOG.warn("Fail to release zk lock");
            }
            if (this.addLockRevokeListener) {
                if (this.pathChildrenCache != null) {
                    this.pathChildrenCache.close();
                }
                if (this.executors != null) {
                    if (!this.executors.isShutdown()) {
                        this.executors.shutdown();
                    }
                    if (!this.executors.isTerminated()) {
                        this.executors.awaitTermination(10L, TimeUnit.MINUTES);
                    }
                }
            }
            try {
                this.zkClient.close();
            } catch (Exception e2) {
                LOG.warn("Fail to close zk client");
            }
        } catch (Throwable th) {
        }
        this.compactLock = null;
        this.pathChildrenCache = null;
        this.executors = null;
        this.zkClient = null;
        this.zkStarted = false;
        this.isCompactProcess = false;
        this.acquire = false;
        stopCompactThreads();
    }

    private void stopCompactThreads() {
        if (this.initiatorThread != null) {
            this.initiatorThread.close();
            this.initiatorThread = null;
        }
        if (this.cleanerThread != null) {
            this.cleanerThread.close();
            this.cleanerThread = null;
        }
        if (this.workerThreads != null) {
            Iterator<CompactThread> it = this.workerThreads.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.workerThreads = null;
        }
    }
}
