此项目为计划做的金融版抖音后端。技术栈springcloudalibaba、springcloudgateway、sw.zip
立即下载
资源介绍:
此项目为计划做的金融版抖音后端。技术栈springcloudalibaba、springcloudgateway、sw
package com.x.provider.mc.service.impl;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.x.core.utils.BeanUtil;
import com.x.core.utils.CompareUtils;
import com.x.core.utils.JsonUtil;
import com.x.core.web.page.PageDomain;
import com.x.core.web.page.PageHelper;
import com.x.core.web.page.PageLimit;
import com.x.core.web.page.PageList;
import com.x.provider.api.customer.enums.CustomerOptions;
import com.x.provider.api.customer.model.dto.ListSimpleCustomerRequestDTO;
import com.x.provider.api.customer.model.dto.SimpleCustomerDTO;
import com.x.provider.api.customer.service.CustomerRpcService;
import com.x.provider.api.mc.constants.McEventTopic;
import com.x.provider.api.mc.enums.ConversationType;
import com.x.provider.api.mc.enums.GroupType;
import com.x.provider.api.mc.model.dto.SendMessageRequestDTO;
import com.x.provider.api.mc.model.event.MessageEvent;
import com.x.provider.api.mc.model.protocol.CommonMessageBodyDTO;
import com.x.provider.api.mc.model.protocol.MessageType;
import com.x.provider.mc.configure.ApplicationConfig;
import com.x.provider.mc.mapper.*;
import com.x.provider.mc.model.bo.GetConversationRequestBO;
import com.x.provider.mc.model.query.MessageConversationQuery;
import com.x.provider.mc.model.vo.MarkMessageAsReadRequestVO;
import com.x.provider.mc.model.domain.ConversationId;
import com.x.provider.mc.model.domain.*;
import com.x.provider.mc.model.bo.ConversationBO;
import com.x.provider.mc.model.bo.MessageBO;
import com.x.provider.mc.model.query.ConversationQuery;
import com.x.provider.mc.model.query.GroupQuery;
import com.x.provider.mc.model.query.MessageQuery;
import com.x.provider.mc.service.MessageEngineService;
import com.x.provider.mc.service.MessageService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
@Slf4j
@Service
public class MessageServiceImpl implements MessageService {
private static final String CONVERSATION_ID_SPLITTER = "_";
private static final String CONVERSATION_ID_FORMATTER = "{}" + CONVERSATION_ID_SPLITTER +"{}";
public static final String DEFAULT_ALERT_MESSAGE = "欢迎新同学";
private final MessageMapper messageMapper;
private final MessageConversationMapper messageConversationMapper;
private final GroupMapper groupMapper;
private final ConversationMapper conversationMapper;
private final Executor executor;
private final MessageEngineService messageEngineService;
private final ApplicationConfig applicationConfig;
private final GroupMemberMapper groupMemberMapper;
private final KafkaTemplate kafkaTemplate;
private final CustomerRpcService customerRpcService;
public MessageServiceImpl(MessageMapper messageMapper,
@Qualifier("mcDefaultExecutor") Executor executor,
MessageEngineService messageEngineService,
ApplicationConfig applicationConfig,
MessageConversationMapper messageOwnerMapper,
GroupMapper groupMapper,
ConversationMapper conversationMapper,
GroupMemberMapper groupMemberMapper,
KafkaTemplate kafkaTemplate,
CustomerRpcService customerRpcService){
this.messageMapper = messageMapper;
this.executor = executor;
this.messageEngineService = messageEngineService;
this.applicationConfig = applicationConfig;
this.messageConversationMapper = messageOwnerMapper;
this.groupMapper = groupMapper;
this.conversationMapper = conversationMapper;
this.groupMemberMapper = groupMemberMapper;
this.kafkaTemplate = kafkaTemplate;
this.customerRpcService = customerRpcService;
}
@Override
public PageList listMessage(String conversationIdStr, Long sessionCustomerId, PageDomain pageDomain) {
ConversationId conversationId = toConversationId(conversationIdStr);
MessageConversationQuery messageConversationQuery = MessageConversationQuery.builder()
.conversationId(conversationIdStr)
.ownerCustomerId(sessionCustomerId)
.ltId(pageDomain.getCursor())
.pageLimit(new PageLimit(null, pageDomain.getPageSize()))
.build();
List messageConversations = listMessageConversation(messageConversationQuery);
if (messageConversations.isEmpty()){
return new PageList<>();
}
messageConversations.sort(Comparator.comparing(MessageConversation::getId));
final Map messageMap = listMessage(MessageQuery.builder().idList(messageConversations.stream().map(item -> item.getMessageId()).collect(Collectors.toList())).build()).stream()
.collect(Collectors.toMap(item -> item.getId(), item -> item));
List messageList = new ArrayList<>(messageMap.size());
messageConversations.forEach(item -> {
final Message message = messageMap.get(item.getMessageId());
if (message != null){
messageList.add(message);
}
});
if (CollectionUtils.isEmpty(messageList)){
return new PageList<>();
}
return new PageList<>(messageList, pageDomain.getPageSize(), CollectionUtils.firstElement(messageConversations).getId());
}
@Override
public MessageBO sendMessage(SendMessageRequestDTO sendMessageAO) {
Message message = BeanUtil.prepare(sendMessageAO, Message.class);
message.setCreatedOnUtc(new Date());
if (!sendMessageAO.getOnlineUserOnly()){
messageMapper.insert(message);
}
MessageEvent messageEvent = BeanUtil.prepare(message, MessageEvent.class);
messageEvent.setEventType(MessageEvent.EventTypeEnum.SEND.getValue());
messageEvent.setOnlineUserOnly(sendMessageAO.getOnlineUserOnly());
this.kafkaTemplate.send(McEventTopic.TOPIC_NAME_SEND_MESSAGE, StrUtil.format("{}:{}",sendMessageAO.getToCustomerId() + sendMessageAO.getToGroupId()), messageEvent);
return buildQuery(message);
}
@Override
public void onSendMessage(MessageEvent messageEvent){
if (!MessageEvent.EventTypeEnum.SEND.getValue().equals(messageEvent.getEventType())){
return;
}
Message message = BeanUtil.prepare(messageEvent, Message.class);
if (messageEvent.getOnlineUserOnly() != null && messageEvent.getOnlineUserOnly()){
messageEngineService.sendMessage(null, buildQuery(message));
return;
}
saveMessageConversation(message);
ConversationType conversationType = buildQuery(message.getToCustomerId(), message.getToGroupId());
List conversationList = saveConversation(message, conversationType.getValue()).stream()
.filter(item -> !item.getOwnerCustomerId().equals(message.getFromCustomerId())).collect(Collectors.toList());
if (conversationList.isEmpty()){
return;
}
List conversationDTO = prepare(conversationList);
MessageBO messageDTO = buildQuery(message);
conversationDTO.forEach(item -> {
messageEngineService.sendMessage(item, messageDTO);
});
}
@Override
public Long getTotalUnreadMessageCount(Long ownerCustomerId){
return conversationMapper.sumUnreadCount(ownerCustomerId);
资源文件列表:
此项目为计划做的金融版抖音后端。技术栈springcloudalibaba、springcloudgateway、sw.zip 大约有837个文件