/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.api.internal.tmc;

import com.taobao.api.internal.tmc.KeySelector;
import com.taobao.api.internal.tmc.Message;
import com.taobao.api.internal.tmc.TmcClient;
import com.taobao.api.internal.tmc.TmcHandler;
import com.taobao.api.internal.toplink.endpoint.EndpointContext;
import com.taobao.api.internal.util.NamedThreadFactory;
import com.taobao.api.internal.util.json.JSONValidatingReader;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DuplicateRemoverTmcHandler
extends TmcHandler {
    private static final Log log = LogFactory.getLog(DuplicateRemoverTmcHandler.class);
    private static final Log statlog = LogFactory.getLog((String)DuplicateRemoverTmcHandler.class.getSimpleName());
    private static final long SCHEDULE_DELAY = 10L;
    private static final long SCHEDULE_PERIOD = 100L;
    private static final long REMOVE_DUPLICATE_PERIOD = 1000L;
    private ConcurrentHashMap<String, Message> msgMap;
    private LinkedBlockingQueue<Pair<String, Long>> msgKeyQueue;
    private KeySelector keySelector;
    private ScheduledExecutorService scheduledService;
    private ScheduledFuture<?> scheduledFuture;

    public DuplicateRemoverTmcHandler(TmcClient tmcClient) {
        super(tmcClient);
        this.keySelector = tmcClient.getKeySelector() == null ? new MsgKeySelector() : tmcClient.getKeySelector();
        this.msgMap = new ConcurrentHashMap();
        this.msgKeyQueue = new LinkedBlockingQueue(tmcClient.getQueueSize() * 2);
        this.scheduledService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("tmc-duplicate-remover"));
        this.scheduledFuture = this.scheduledService.scheduleWithFixedDelay(new MsgScheduleTask(), 10L, 100L, TimeUnit.MILLISECONDS);
    }

    public void onMessage(EndpointContext context) throws Exception {
        Message msg;
        String key;
        Map<String, Object> rawMsg = context.getMessage();
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("onMessage from %s: %s", context.getMessageFrom(), rawMsg));
        }
        if ((key = this.keySelector.selectKey(msg = this.parse(rawMsg))) == null) {
            super.handleMessage(msg, false);
        } else if (!this.put(key, msg)) {
            super.handleMessage(msg, true);
            this.log(key, msg.getId());
        }
    }

    private void log(String key, Long msgId) {
        StringBuilder buf = new StringBuilder();
        buf.append(System.currentTimeMillis()).append(",");
        buf.append(this.tmcClient.getAppKey()).append(",");
        buf.append(this.tmcClient.getGroupName()).append(",");
        buf.append(msgId).append(",");
        buf.append(key);
        statlog.fatal((Object)buf.toString());
    }

    public void close() {
        super.close();
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(true);
            this.scheduledFuture = null;
            this.scheduledService.shutdown();
            this.scheduledService = null;
        }
    }

    private boolean put(String key, Message message) throws InterruptedException {
        Message obj = this.msgMap.putIfAbsent(key, message);
        if (obj == null) {
            this.msgKeyQueue.put(new Pair<String, Long>(key, System.currentTimeMillis()));
            return true;
        }
        return false;
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    class Pair<K, V> {
        private K key;
        private V value;

        public K getKey() {
            return this.key;
        }

        public void setKey(K key) {
            this.key = key;
        }

        public V getValue() {
            return this.value;
        }

        public void setValue(V value) {
            this.value = value;
        }

        public Pair(K key, V value) {
            this.key = key;
            this.value = value;
        }
    }

    private class MsgScheduleTask
    extends TimerTask {
        private MsgScheduleTask() {
        }

        public void run() {
            while (true) {
                Pair keyPair = null;
                try {
                    keyPair = (Pair)DuplicateRemoverTmcHandler.this.msgKeyQueue.poll();
                    if (keyPair == null) break;
                    DuplicateRemoverTmcHandler.this.handleMessage((Message)DuplicateRemoverTmcHandler.this.msgMap.remove(keyPair.getKey()), false);
                    if ((Long)keyPair.getValue() + 1000L <= System.currentTimeMillis()) continue;
                }
                catch (Exception e) {
                    log.error((Object)("handle message fail: %s" + (String)keyPair.getKey()), (Throwable)e);
                    continue;
                }
                break;
            }
        }
    }

    private class MsgKeySelector
    implements KeySelector {
        private MsgKeySelector() {
        }

        public String selectKey(Message message) {
            String topic = message.getTopic();
            String key = null;
            key = topic.startsWith("taobao_trade") || topic.equals("taobao_datapush_SynTrade") ? "trade_" + this.getId(message, "tid") : (topic.startsWith("taobao_item") || topic.equals("taobao_datapush_SynItem") ? "item_" + this.getId(message, "num_iid") : (topic.startsWith("taobao_refund") ? "refund_" + this.getId(message, "refund_id") : null));
            return key;
        }

        private String getId(Message message, String field) {
            JSONValidatingReader reader = new JSONValidatingReader();
            message.setContentMap((Map)reader.read(message.getContent()));
            Object id = message.getContentMap().get(field);
            return String.valueOf(id);
        }
    }
}

