|
@@ -20,6 +20,7 @@ package org.maxkey.persistence.kafka;
|
|
|
import java.util.UUID;
|
|
|
|
|
|
import org.maxkey.configuration.ApplicationConfig;
|
|
|
+import org.maxkey.pretty.PrettyFactory;
|
|
|
import org.maxkey.util.DateUtils;
|
|
|
import org.maxkey.util.JsonUtils;
|
|
|
import org.slf4j.Logger;
|
|
@@ -30,7 +31,6 @@ import org.springframework.stereotype.Component;
|
|
|
|
|
|
@Component
|
|
|
public class KafkaPersistService {
|
|
|
-
|
|
|
private static final Logger _logger = LoggerFactory.getLogger(KafkaPersistService.class);
|
|
|
|
|
|
@Autowired
|
|
@@ -58,29 +58,24 @@ public class KafkaPersistService {
|
|
|
* @param actionType CREATE UPDATE DELETE
|
|
|
*/
|
|
|
public void send(String topic,Object content,String actionType) {
|
|
|
- //config.identity.kafkasupport , if true
|
|
|
+ //maxkey.server.kafka.support , if true
|
|
|
if(applicationConfig.isKafkaSupport()) {
|
|
|
- KafkaMessage message = new KafkaMessage();
|
|
|
- //message id is uuid
|
|
|
- message.setMsgId(UUID.randomUUID().toString());
|
|
|
- message.setActionType(actionType);
|
|
|
- message.setTopic(topic);
|
|
|
- //send to kafka time
|
|
|
- message.setSendTime(DateUtils.getCurrentDateTimeAsString());
|
|
|
- //content Object to json message content
|
|
|
- message.setContent(JsonUtils.gson2Json(content));
|
|
|
+ KafkaMessage message =
|
|
|
+ new KafkaMessage(
|
|
|
+ topic, //kafka TOPIC
|
|
|
+ actionType, //action of content
|
|
|
+ DateUtils.getCurrentDateTimeAsString(), //send to kafka time
|
|
|
+ UUID.randomUUID().toString(), //message id as uuid
|
|
|
+ content //content Object to json message content
|
|
|
+ );
|
|
|
String msg = JsonUtils.gson2Json(message);
|
|
|
- _logger.debug("send message = {}", msg);
|
|
|
- //通过线程发送Kafka消息
|
|
|
+ //sand msg to Kafka topic
|
|
|
KafkaProvisioningThread thread =
|
|
|
new KafkaProvisioningThread(kafkaTemplate,topic,msg);
|
|
|
-
|
|
|
thread.start();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
/**
|
|
|
* KafkaProvisioningThread for send message
|
|
|
*
|
|
@@ -97,20 +92,16 @@ public class KafkaPersistService {
|
|
|
KafkaTemplate<String, String> kafkaTemplate,
|
|
|
String topic,
|
|
|
String msg) {
|
|
|
-
|
|
|
this.kafkaTemplate = kafkaTemplate;
|
|
|
this.topic = topic;
|
|
|
this.msg = msg;
|
|
|
-
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void run() {
|
|
|
- _logger.trace("send to Message Queue ...");
|
|
|
+ _logger.debug("send message \n{}" , PrettyFactory.getJsonPretty().format(msg));
|
|
|
kafkaTemplate.send(topic, msg);
|
|
|
- _logger.trace("send to Message Queue finished .");
|
|
|
+ _logger.debug("send to Message Queue finished .");
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
-
|
|
|
}
|