KafkaReadProcessor.java 7.2 KB
package com.tianbo.messagebus.service;

import com.alibaba.fastjson.JSON;
import com.tianbo.messagebus.config.KafkaConsumerConfig;
import com.tianbo.messagebus.controller.response.ResultJson;
import com.tianbo.messagebus.kafka.ConsumersCache;
import com.tianbo.messagebus.model.MSGS;
import com.tianbo.messagebus.myinterface.KafkaSendApi;
import feign.FeignException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;

@Service
@Slf4j
@EnableAsync
public class KafkaReadProcessor {

    /**
     * 账号名/TOPIC名称
     */
    @Value("${message-bus.auth.username}")
    private String userName;

    @Value("${message-bus.consumer-group-id}")
    private String groupName;

    @Autowired
    KafkaSendApi kafkaSendApi;

    @Value("${kafka.bootstrap-servers}")
    private String servers;

    @Value("${kafka.max-poll-records}")
    private String maxPollRecords;
    /**
     * 失败重发请求次数
     */
    private static final int RETRY_TIMES= 10;

    /**
     * 多线程中consumer锁
     */

    @Async
    @Scheduled(fixedRate = 3000)
    public void msgProcess(){
        try{
            if (StringUtils.isNotEmpty(userName) && StringUtils.isNotEmpty(groupName)){
                log.info("1.【开始】用[{}]组读取topic[{}]->",groupName,userName);
                Map<String, Object> map=new KafkaConsumerConfig().consumerConfigs();
                map.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
                map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
                map.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
                log.info("----2.消费者组为:{}----",groupName);

                //针对三个partition创建三个消费者,并缓存
                for (int i = 1; i <=3 ; i++) {
                    KafkaConsumer<String, String> consumer;
                    String consumerName = userName+"-"+i;

                    //消费锁判定
                    Map map_lock = ConsumersCache.getConsumerLock();
                    if (map_lock.containsKey(consumerName) && ConsumersCache.getLockState(consumerName)){
                        log.info("[CONSUMER-LOCK-{}] 消费状态为锁定,正在消费",consumerName);
                        return;
                    }else {
                        log.info("[CONSUMER-LOCK-{}] 消费状态为正常,可以消费",consumerName);
                        ConsumersCache.lock(consumerName);
                    }


                    if (ConsumersCache.getConsumerMap().containsKey(consumerName)){
                        consumer = ConsumersCache.consumerMap.get(consumerName);
                        log.info("[loop-start]3.从缓存中获取到消费者:{}的消费者信息[{}]。",consumerName,consumer);
                    }else {
                        map.put(ConsumerConfig.CLIENT_ID_CONFIG,consumerName);
                        consumer =new KafkaConsumer<String, String>(map);
                        ConsumersCache.consumerMap.put(consumerName,consumer);
                        log.info("[CONSUMER] 3.缓存中没有消费者{}的信息,创建新的消费者信息",consumerName);
                    }

                    consumer.subscribe(Arrays.asList(userName));
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
                    log.info("----[CONSUMER] 4.消费者:{}此次成功消费数据{}条----",consumerName,records.count());

                    if(!records.isEmpty()){
                        for (ConsumerRecord<String, String> record : records) {
                            String msg  = record.value();
                            log.info("[loop-start]5.开始处理消息{}",msg);
                            MSGS msgs = transMsg(msg);

                            boolean sendResult = sendmsg(msgs);
                            if(!sendResult){
                                log.error("!!!!!!消息--->{}<---转发失败!!!!!!,尝试重发",msg);
                                //todo:消息备份或者重发?
                                reTrySend(msgs);
                            }
                        }
                        consumer.commitSync();
                        log.info("[CONSUMER] 消费者{}-{}消费提交成功",consumerName,groupName);

                    }else {
                        log.info("----[END]5.消费者的TOPIC没有新的消费数据即将返回----");
                    }
                    ConsumersCache.unlock(consumerName);
                }

            }

        }catch (Exception e){
            e.printStackTrace();
            log.info("转发出错,{}",e.toString());
            log.error("转发出错,{}",e.toString());
            ConsumersCache.unlock(userName+"-"+1);
            ConsumersCache.unlock(userName+"-"+2);
            ConsumersCache.unlock(userName+"-"+3);
        }

    }

    public MSGS transMsg(String msg){
        MSGS rootJson = JSON.parseObject(msg,MSGS.class);
        rootJson.getMSG().getHEADER().setSNDR(userName);
//        JSONObject msgJson = rootJson.getJSONObject("MSG");
//        JSONObject body = msgJson.getJSONObject("BODY");
//
//        HEADER msgHeader = msgJson.getObject("HEADER",HEADER.class);
//        msgHeader.setSNDR(userName);
//
//        MSG transMsg=  new MSG();
//        transMsg.setHEADER(msgHeader);
//        transMsg.setBODY(body);
//
//        MSGS msgs = new MSGS();
//        msgs.setMSG(transMsg);
        return rootJson;
    }

    public boolean sendmsg(MSGS msgs){
        try {
            ResultJson response = kafkaSendApi.send(msgs);

            if ("200".equals(response.getCode())){
                log.info("[SEND-PRODUCT]………………消息发送成功{}………………",response.toString());
                return true;
            }
            log.info("[SEND-PRODUCT]400-消息发送失败->{}",response.toString());
        }catch (FeignException ex){
            log.error("[SEND-PRODUCT] 发送服务调用失败-->>{}",ex.toString());
        }
        return false;
    }

    /**
     * feign重发消息
     */
    public boolean reTrySend(MSGS msgs) throws InterruptedException {
        log.error("***进入重发***");
        boolean flag = false;
        int i = 0;
        while (true){
            Thread.sleep(1000);
            i++;
            boolean sendResult = sendmsg(msgs);
            if (sendResult){
                log.error("[RESEND-PRODUCT]***重发成功,重发次数({})***",i);
                log.info("[RESEND-PRODUCT]***重发成功,重发次数({})***",i);
                flag = true;
                break;
            }
        }
        return flag;
    }
}