第九次课:消息队列发送邮件
分类: springboot 专栏: 在线教育项目实战 标签: 消息队列发邮件
2023-05-15 16:13:44 376浏览
消息队列发邮件
背景:禁用用户功能
依赖
<!--mq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--java-mail--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-mail</artifactId> </dependency> <!--thymeleaf--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency>
相关配置
spring: # 邮件配置 mail: # 邮件服务器地址 host: smtp.qq.com # 协议 protocol: smtp # 编码格式 default-encoding: utf-8 # 授权码(在邮箱开通服务时获取) password: jypzlyqcnctjggbc # 发送者邮箱地址 username: jf3q@qq.com # 端口(不同邮箱端口号不同) port: 25 # rabbitmq配置 rabbitmq: # 用户名 username: guest # 密码 password: guest # 服务器地址 host: 192.168.56.16 # 端口 port: 5672 #消息确认回调 publisher-confirm-type: correlated #消息失败回调 publisher-returns: true listener: simple: #开启手动确认 acknowledge-mode: manual
//消息投递成功 Integer SUCCESS = 1; //消息投递失败 Integer FAILURE = 2; //最大重试次数 Integer MAX_TRY_COUNT = 3; //消息超时时间 Integer MSG_TIMEOUT = 1; //队列 String MAIL_QUEUE_NAME = "mail.queue";//禁用邮件的消息队列 //交换机 String MAIL_EXCHANGE_NAME = "mail.exchange"; //路由键 String MAIL_ROUTING_KEY_NAME = "mail.routing.key";
@Bean public Queue queue(){ return new Queue(SysConstant.MAIL_QUEUE_NAME); }
@Configuration @Slf4j public class RabbitMqConfig { @Autowired private CachingConnectionFactory cachingConnectionFactory; @Autowired private EduMailLogService mailLogService; @Bean public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); /** * 消息确认回调,确认消息是否到达broker * data:消息唯一标识 * ack:确认结果 * cause:失败原因 */ rabbitTemplate.setConfirmCallback((data,ack,cause)->{ String msgId = data.getId(); if(ack){ log.info("{}=======>消息发送成功",msgId); mailLogService.update(new UpdateWrapper<EduMailLog>().set("status",1).eq("id",msgId)); }else { log.error("{}=======>消息发送失败",msgId); } }); /** * 消息失败回调,比如router不到queue时回调 * msg:消息主题 * repCode:响应码 * repText:相应描述 * exchange:交换机 * routingkey:路由键 */ rabbitTemplate.setReturnCallback((msg,repCode,repText,exchange,routingkey)->{ log.error("{}=======>消息发送queue时失败",msg.getBody()); }); return rabbitTemplate; } @Bean public Queue queue(){ return new Queue(SysConstant.MAIL_QUEUE_NAME); } @Bean public DirectExchange directExchange(){ return new DirectExchange(SysConstant.MAIL_EXCHANGE_NAME); } @Bean public Binding binding(){ return BindingBuilder.bind(queue()).to(directExchange()).with(SysConstant.MAIL_ROUTING_KEY_NAME); } }
简单模式
不考虑消息重复消费问题,也不考虑消息生产者投递队列可能失败的情况
- 消息生产者代码
//发送邮件给该用户 通过消息队列 EduMember member = this.getById(eduMember.getId()); rabbitTemplate.convertAndSend(SysConstant.MAIL_QUEUE_NAME,member);
- 消息消费者
@Resource private JavaMailSender javaMailSender; @Resource private MailProperties mailProperties; @Resource private TemplateEngine templateEngine; @RabbitListener(queues = SysConstant.MAIL_QUEUE_NAME) public void handler(EduMember member){ //发件人 try { MimeMessage mimeMessage =javaMailSender.createMimeMessage(); MimeMessageHelper helper= new MimeMessageHelper(mimeMessage); helper.setFrom(mailProperties.getUsername()); //收件人 helper.setTo(member.getEmail());//万一地址写的有问题 //主题 helper.setSubject("禁用通知邮件"); //发送日期 helper.setSentDate(new Date()); //邮件内容 Context context = new Context(); context.setVariable("nickname", member.getNickname()); context.setVariable("reason", member.getReason()); String mail = templateEngine.process("mail", context); helper.setText(mail, true); //发送邮件 javaMailSender.send(mimeMessage); log.info("邮件发送成功"); } catch (MessagingException e) { e.printStackTrace(); log.info("邮件发送失败:{}",e.getMessage()); } }
消息的可靠性解决方案
怎么保证生产端的消息可靠性的投递。
- 方式一消息落库,对消息状态打标
实现流程:
1.发送消息时,将当前消息数据存入数据库,投递状态为消息投递中
2.rabbitmq开启消息确认回调机制。确认成功,更新投递状态为消息投递成功
3.开启定时任务,重新投递失败的消息。重试超过3次,更新投递状态为投递失败
缺点:多次操作MySQL数据库
- 方式二:消息延迟投递,做二次确认,回调检查
实现流程:
业务数据入库(不是消息入库哦)
生产者第一次发送消息
延迟投递——第二次发送消息(有个时间差)
消费者收到消息后发个确认消息给消息队列
消息队列专门的回调服务监听到确认信息就把消息放到消息数据库里
如果监听到延迟投递的第二次消息的话,就去数据库检查是否已经有确认信息
如果没收到确认信息就重新第一步开始
消息可靠性java代码——消息回调
- 配置类开启确认回调
上文的相关配置已提到,这里不赘述
- 修改springboot配置文件
- 开启定时任务
/** * 邮件发送定时任务 * 10秒执行一次 */ @Scheduled(cron = "0/10 * * * * ?") public void mailTask(){ List<EduMailLog> list = mailLogService.list(new QueryWrapper<EduMailLog>().eq("status", 0).lt("try_time", LocalDateTime.now())); list.forEach(mailLog -> { //如果重试次数超过3次,更新状态为投递失败,不再重试 if (3<=mailLog.getCount()){ mailLogService.update(new UpdateWrapper<EduMailLog>().set("status",2).eq("id",mailLog.getId())); } mailLogService.update(new UpdateWrapper<EduMailLog>().set("count",mailLog.getCount()+1).set("try_time",LocalDateTime.now().plusMinutes(SysConstant.MSG_TIMEOUT)).eq("id",mailLog.getId())); EduMember member = memberService.getById(mailLog.getMid()); //发送消息 rabbitTemplate.convertAndSend(SysConstant.MAIL_EXCHANGE_NAME,SysConstant.MAIL_ROUTING_KEY_NAME,member,new CorrelationData(mailLog.getId())); }); }
消费者的幂等性
重复发一条消息,怎么保证消费者只消费一次。
- 修改springboot配置文件
- 修改消息消费者监听代码
@RabbitListener(queues = SysConstant.MAIL_QUEUE_NAME) public void handler(Message message, Channel channel) { EduMember member = (EduMember) message.getPayload(); MessageHeaders headers = message.getHeaders(); //消息序号 long tag = (long) headers.get(AmqpHeaders.DELIVERY_TAG); String msgId = (String) headers.get("spring_returned_message_correlation"); HashOperations hashOperations = redisTemplate.opsForHash(); try { if (hashOperations.entries("mail_log").containsKey(msgId)){ log.error("消息已经被消费=============>{}",msgId); /** * 手动确认消息 * tag:消息序号 * multiple:是否确认多条 */ channel.basicAck(tag,false); return; } MimeMessage msg = javaMailSender.createMimeMessage(); MimeMessageHelper helper = new MimeMessageHelper(msg); //发件人 helper.setFrom(mailProperties.getUsername()); //收件人 helper.setTo(member.getEmail()); //主题 helper.setSubject("禁用邮件"); //发送日期 helper.setSentDate(new Date()); //邮件内容 Context context = new Context(); context.setVariable("nickname", member.getNickname()); context.setVariable("reason", member.getReason()); String mail = templateEngine.process("mail", context); helper.setText(mail, true); //发送邮件 javaMailSender.send(msg); log.info("邮件发送成功"); //将消息id存入redis hashOperations.put("mail_log", msgId, "OK"); //手动确认消息 channel.basicAck(tag, false); } catch (Exception e) { /** * 手动确认消息 * tag:消息序号 * multiple:是否确认多条 * requeue:是否退回到队列 */ try { channel.basicNack(tag,false,true); } catch (IOException ex) { log.error("邮件发送失败=========>{}", e.getMessage()); } log.error("邮件发送失败=========>{}", e.getMessage()); } }
好博客就要一起分享哦!分享海报
他的专栏
他感兴趣的技术