package jd.dd.network.tcp;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.PriorityBlockingQueue;
import jd.dd.network.tcp.MessageConsumer;
import jd.dd.network.tcp.message.MessageUtil;
import jd.dd.network.tcp.protocol.BaseMessage;
import jd.dd.v3.proto.PacketProto;
import jd.dd.waiter.util.LogUtils;

/* loaded from: classes9.dex */
public class MessageProducer extends Thread implements MessageConsumer.IListener {
    private static int CURRENT_MESSAGE_COUNT = 0;
    public static final int DEFAULT_CONSUMER_SIZE = 4;
    private final BlockingQueue<Object> mCacheQueue;
    private MessageConsumer[] mConsumers;
    private IListener mListener;
    private final BlockingQueue<BaseMessage> mMessageQueue;
    private final String TAG = MessageProducer.class.getSimpleName() + " : " + hashCode();
    private volatile boolean mQuit = false;

    /* loaded from: classes9.dex */
    public interface IListener {
        void onMessageReady(BaseMessage baseMessage);
    }

    public MessageProducer(IListener iListener) {
        int i10 = 0;
        setName("MessageProducer");
        this.mListener = iListener;
        this.mCacheQueue = new LinkedBlockingDeque();
        this.mMessageQueue = new PriorityBlockingQueue();
        this.mConsumers = new MessageConsumer[4];
        while (true) {
            MessageConsumer[] messageConsumerArr = this.mConsumers;
            if (i10 >= messageConsumerArr.length) {
                return;
            }
            int i11 = i10 + 1;
            messageConsumerArr[i10] = new MessageConsumer(this, i11, this.mMessageQueue);
            this.mConsumers[i10].start();
            i10 = i11;
        }
    }

    private boolean isRunning() {
        return !this.mQuit;
    }

    public void add(Object obj) {
        if (obj == null) {
            return;
        }
        try {
            this.mCacheQueue.put(obj);
        } catch (InterruptedException e10) {
            e10.printStackTrace();
        }
    }

    @Override // jd.dd.network.tcp.MessageConsumer.IListener
    public void onMessageReady(BaseMessage baseMessage) {
        IListener iListener = this.mListener;
        if (iListener == null) {
            return;
        }
        iListener.onMessageReady(baseMessage);
    }

    public void quit() {
        LogUtils.d(this.TAG, "It is quiting ...");
        for (MessageConsumer messageConsumer : this.mConsumers) {
            messageConsumer.quit();
        }
        this.mQuit = true;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        BaseMessage baseMessage;
        LogUtils.d(this.TAG, "It is running now ...");
        while (isRunning()) {
            try {
                Object take = this.mCacheQueue.take();
                String str = null;
                if (take instanceof String) {
                    str = (String) take;
                    baseMessage = MessageUtil.castToLocalObject(str);
                } else if (take instanceof PacketProto.Packet) {
                    baseMessage = MessageUtil.castToLocalObject((PacketProto.Packet) take);
                    if (baseMessage != null) {
                        str = baseMessage.toJson();
                    }
                } else {
                    baseMessage = null;
                }
                CURRENT_MESSAGE_COUNT++;
                LogUtils.printAll(this.TAG, "NO. " + CURRENT_MESSAGE_COUNT + " message:" + str + " remain:" + this.mCacheQueue.size());
                if (baseMessage != null) {
                    this.mMessageQueue.put(baseMessage);
                } else {
                    LogUtils.e(this.TAG, "It is NOT a valid package from " + str);
                }
            } catch (InterruptedException unused) {
                LogUtils.d(this.TAG, "The thread is interrupted.");
            }
        }
        LogUtils.i(this.TAG, "It stops running.");
    }

    public void shutdown() {
        LogUtils.d(this.TAG, "It is shutting down ...");
        for (MessageConsumer messageConsumer : this.mConsumers) {
            messageConsumer.shutdown();
        }
        this.mQuit = true;
        interrupt();
        try {
            LogUtils.i(this.TAG, "Waiting for thread to quit.");
            join();
        } catch (InterruptedException e10) {
            e10.printStackTrace();
        }
        LogUtils.i(this.TAG, "It is destroyed.");
    }
}
