package service import ( "bytes" "context" "encoding/json" "fmt" "io" "log/slog" "net/http" "strings" "time" "pay-bridge/internal/model" "pay-bridge/internal/repository" ) // 重试间隔:9 次推送机会(第1次立即,后续8次重试) var retryIntervals = []time.Duration{ 0, 15 * time.Second, 30 * time.Second, 1 * time.Minute, 5 * time.Minute, 30 * time.Minute, 1 * time.Hour, 6 * time.Hour, 12 * time.Hour, } const maxRetry = 8 // NotifyService 通知服务 type NotifyService struct { notifyRepo *repository.NotifyLogRepository tradeRepo *repository.TradeOrderRepository httpClient *http.Client } func NewNotifyService( notifyRepo *repository.NotifyLogRepository, tradeRepo *repository.TradeOrderRepository, httpTimeout time.Duration, ) *NotifyService { return &NotifyService{ notifyRepo: notifyRepo, tradeRepo: tradeRepo, httpClient: &http.Client{Timeout: httpTimeout}, } } // SendNotify 向下游发送通知(首次调用) func (s *NotifyService) SendNotify(ctx context.Context, tradeNo string, notifyType model.NotifyType, notifyURL string) error { // 构建通知内容 payload, err := s.buildPayload(ctx, tradeNo, notifyType) if err != nil { return err } // 创建通知记录 now := time.Now() log := &model.NotifyLog{ TradeNo: tradeNo, NotifyType: notifyType, NotifyURL: notifyURL, Status: model.NotifyStatusPending, RetryCount: 0, } if err := s.notifyRepo.Upsert(ctx, log); err != nil { slog.ErrorContext(ctx, "upsert notify log failed", "trade_no", tradeNo, "err", err) } // 发送通知 resp, err := s.sendHTTP(ctx, notifyURL, payload) if err == nil && isSuccessResponse(resp) { s.notifyRepo.MarkSuccess(ctx, log.ID, resp) slog.InfoContext(ctx, "notify success", "trade_no", tradeNo, "type", notifyType) return nil } // 首次失败,写入重试队列 errMsg := "" if err != nil { errMsg = err.Error() } else { errMsg = resp } nextTime := now.Add(retryIntervals[1]) s.notifyRepo.IncrRetryCount(ctx, log.ID, model.NotifyStatusRetry, &nextTime, errMsg) slog.WarnContext(ctx, "notify failed, scheduled retry", "trade_no", tradeNo, "next_retry", nextTime) return nil } // ProcessRetryQueue 处理重试队列(由 Poller 调用) func (s *NotifyService) ProcessRetryQueue(ctx context.Context, batchSize int) error { logs, err := s.notifyRepo.ListPendingRetry(ctx, time.Now(), batchSize) if err != nil { return err } for _, log := range logs { s.processOne(ctx, log) } return nil } func (s *NotifyService) processOne(ctx context.Context, log *model.NotifyLog) { payload, err := s.buildPayload(ctx, log.TradeNo, log.NotifyType) if err != nil { slog.ErrorContext(ctx, "build payload failed", "trade_no", log.TradeNo, "err", err) return } resp, err := s.sendHTTP(ctx, log.NotifyURL, payload) if err == nil && isSuccessResponse(resp) { s.notifyRepo.MarkSuccess(ctx, log.ID, resp) slog.InfoContext(ctx, "notify retry success", "trade_no", log.TradeNo, "retry_count", log.RetryCount) return } errMsg := "" if err != nil { errMsg = err.Error() } else { errMsg = resp } nextRetryIdx := log.RetryCount + 1 if nextRetryIdx > maxRetry { s.notifyRepo.MarkGiveup(ctx, log.ID) slog.WarnContext(ctx, "notify giveup after max retries", "trade_no", log.TradeNo) return } var nextTime *time.Time if nextRetryIdx < len(retryIntervals) { t := time.Now().Add(retryIntervals[nextRetryIdx]) nextTime = &t } status := model.NotifyStatusRetry if nextRetryIdx >= maxRetry { status = model.NotifyStatusGiveup } s.notifyRepo.IncrRetryCount(ctx, log.ID, status, nextTime, errMsg) } // buildPayload 构建通知内容 func (s *NotifyService) buildPayload(ctx context.Context, tradeNo string, notifyType model.NotifyType) ([]byte, error) { order, err := s.tradeRepo.GetByTradeNo(ctx, tradeNo) if err != nil || order == nil { return nil, fmt.Errorf("order not found: %s", tradeNo) } payload := map[string]any{ "trade_no": order.TradeNo, "merchant_order_no": order.MerchantOrderNo, "app_id": order.AppID, "pay_method": order.PayMethod, "amount": order.Amount, "status": order.Status, "notify_type": notifyType, "timestamp": time.Now().Unix(), } if order.ChannelTradeNo != "" { payload["channel_trade_no"] = order.ChannelTradeNo } if order.PayTime != nil { payload["pay_time"] = order.PayTime.Unix() } return json.Marshal(payload) } // sendHTTP 向下游发送 HTTP POST 通知 func (s *NotifyService) sendHTTP(ctx context.Context, notifyURL string, payload []byte) (string, error) { req, err := http.NewRequestWithContext(ctx, http.MethodPost, notifyURL, bytes.NewReader(payload)) if err != nil { return "", err } req.Header.Set("Content-Type", "application/json") resp, err := s.httpClient.Do(req) if err != nil { return "", err } defer resp.Body.Close() body, _ := io.ReadAll(io.LimitReader(resp.Body, 512)) return string(body), nil } // isSuccessResponse 判断下游是否返回成功 // 下游返回 HTTP 200 且 body 包含 "success" 则视为成功 func isSuccessResponse(body string) bool { return strings.Contains(strings.ToLower(body), "success") } // NextRetryTime 计算下次重试时间 func NextRetryTime(retryCount int) (time.Time, bool) { idx := retryCount + 1 if idx >= len(retryIntervals) { return time.Time{}, false } return time.Now().Add(retryIntervals[idx]), true } // StartPoller 启动通知重试 Poller goroutine func (s *NotifyService) StartPoller(ctx context.Context, interval time.Duration, batchSize int) { go func() { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: if err := s.ProcessRetryQueue(ctx, batchSize); err != nil { slog.Error("notify poller error", "err", err) } } } }() slog.Info("notify poller started", "interval", interval) }