Files
whale-town-end/src/business/notice/notice.service.ts
moyin a2d630d864 feat:实现通知系统核心功能
- 添加通知实体和数据传输对象
- 实现通知服务层逻辑,支持创建、查询、标记已读
- 添加通知REST API控制器
- 实现WebSocket网关,支持实时通知推送
- 支持系统通知、用户通知、广播通知三种类型
- 支持定时通知功能,每分钟自动检查待发送通知
- 添加通知模块导出
2026-01-10 21:51:29 +08:00

145 lines
4.2 KiB
TypeScript

import { Injectable, Logger, NotFoundException } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, LessThanOrEqual } from 'typeorm';
import { Cron, CronExpression } from '@nestjs/schedule';
import { Notice, NoticeStatus, NoticeType } from './notice.entity';
import { CreateNoticeDto } from './dto/create-notice.dto';
import { NoticeGateway } from './notice.gateway';
@Injectable()
export class NoticeService {
private readonly logger = new Logger(NoticeService.name);
constructor(
@InjectRepository(Notice)
private readonly noticeRepository: Repository<Notice>,
private readonly noticeGateway: NoticeGateway,
) {}
async create(createNoticeDto: CreateNoticeDto): Promise<Notice> {
const notice = this.noticeRepository.create({
...createNoticeDto,
scheduledAt: createNoticeDto.scheduledAt ? new Date(createNoticeDto.scheduledAt) : null,
});
const savedNotice = await this.noticeRepository.save(notice);
// 如果没有设置计划时间,立即发送
if (!savedNotice.scheduledAt) {
await this.sendNotice(savedNotice);
}
return savedNotice;
}
async findAll(userId?: number): Promise<Notice[]> {
const query = this.noticeRepository.createQueryBuilder('notice');
if (userId) {
query.where('notice.userId = :userId OR notice.userId IS NULL', { userId });
}
return query.orderBy('notice.createdAt', 'DESC').getMany();
}
async findById(id: number): Promise<Notice> {
const notice = await this.noticeRepository.findOne({ where: { id } });
if (!notice) {
throw new NotFoundException(`Notice with ID ${id} not found`);
}
return notice;
}
async markAsRead(id: number, userId?: number): Promise<Notice> {
const notice = await this.findById(id);
// 检查权限:只能标记自己的通知或广播通知为已读
if (notice.userId && userId && notice.userId !== userId) {
throw new NotFoundException(`Notice with ID ${id} not found`);
}
notice.status = NoticeStatus.READ;
notice.readAt = new Date();
return this.noticeRepository.save(notice);
}
async getUserUnreadCount(userId: number): Promise<number> {
return this.noticeRepository.count({
where: [
{ userId, status: NoticeStatus.SENT },
{ userId: null, status: NoticeStatus.SENT }, // 广播通知
],
});
}
private async sendNotice(notice: Notice): Promise<void> {
try {
// 通过WebSocket发送通知
if (notice.userId) {
// 发送给特定用户
this.noticeGateway.sendToUser(notice.userId, {
type: 'notice',
data: notice,
});
} else {
// 广播通知
this.noticeGateway.broadcast({
type: 'notice',
data: notice,
});
}
// 更新状态
notice.status = NoticeStatus.SENT;
notice.sentAt = new Date();
await this.noticeRepository.save(notice);
this.logger.log(`Notice ${notice.id} sent successfully`);
} catch (error) {
this.logger.error(`Failed to send notice ${notice.id}:`, error);
notice.status = NoticeStatus.FAILED;
await this.noticeRepository.save(notice);
}
}
// 定时任务:每分钟检查需要发送的通知
@Cron(CronExpression.EVERY_MINUTE)
async handleScheduledNotices(): Promise<void> {
const now = new Date();
const pendingNotices = await this.noticeRepository.find({
where: {
status: NoticeStatus.PENDING,
scheduledAt: LessThanOrEqual(now),
},
});
for (const notice of pendingNotices) {
await this.sendNotice(notice);
}
if (pendingNotices.length > 0) {
this.logger.log(`Processed ${pendingNotices.length} scheduled notices`);
}
}
// 发送系统通知的便捷方法
async sendSystemNotice(title: string, content: string, userId?: number): Promise<Notice> {
return this.create({
title,
content,
type: NoticeType.SYSTEM,
userId,
});
}
// 发送广播通知的便捷方法
async sendBroadcast(title: string, content: string): Promise<Notice> {
return this.create({
title,
content,
type: NoticeType.BROADCAST,
});
}
}