Files
pay-bridge/backend/internal/service/notify.go
2026-03-13 15:51:59 +08:00

230 lines
5.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package service
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"time"
"pay-bridge/internal/model"
"pay-bridge/internal/repository"
)
// 重试间隔9 次推送机会第1次立即后续8次重试
var retryIntervals = []time.Duration{
0,
15 * time.Second,
30 * time.Second,
1 * time.Minute,
5 * time.Minute,
30 * time.Minute,
1 * time.Hour,
6 * time.Hour,
12 * time.Hour,
}
const maxRetry = 8
// NotifyService 通知服务
type NotifyService struct {
notifyRepo *repository.NotifyLogRepository
tradeRepo *repository.TradeOrderRepository
httpClient *http.Client
}
func NewNotifyService(
notifyRepo *repository.NotifyLogRepository,
tradeRepo *repository.TradeOrderRepository,
httpTimeout time.Duration,
) *NotifyService {
return &NotifyService{
notifyRepo: notifyRepo,
tradeRepo: tradeRepo,
httpClient: &http.Client{Timeout: httpTimeout},
}
}
// SendNotify 向下游发送通知(首次调用)
func (s *NotifyService) SendNotify(ctx context.Context, tradeNo string, notifyType model.NotifyType, notifyURL string) error {
// 构建通知内容
payload, err := s.buildPayload(ctx, tradeNo, notifyType)
if err != nil {
return err
}
// 创建通知记录
now := time.Now()
log := &model.NotifyLog{
TradeNo: tradeNo,
NotifyType: notifyType,
NotifyURL: notifyURL,
Status: model.NotifyStatusPending,
RetryCount: 0,
}
if err := s.notifyRepo.Upsert(ctx, log); err != nil {
slog.ErrorContext(ctx, "upsert notify log failed", "trade_no", tradeNo, "err", err)
}
// 发送通知
resp, err := s.sendHTTP(ctx, notifyURL, payload)
if err == nil && isSuccessResponse(resp) {
s.notifyRepo.MarkSuccess(ctx, log.ID, resp)
slog.InfoContext(ctx, "notify success", "trade_no", tradeNo, "type", notifyType)
return nil
}
// 首次失败,写入重试队列
errMsg := ""
if err != nil {
errMsg = err.Error()
} else {
errMsg = resp
}
nextTime := now.Add(retryIntervals[1])
s.notifyRepo.IncrRetryCount(ctx, log.ID, model.NotifyStatusRetry, &nextTime, errMsg)
slog.WarnContext(ctx, "notify failed, scheduled retry", "trade_no", tradeNo, "next_retry", nextTime)
return nil
}
// ProcessRetryQueue 处理重试队列(由 Poller 调用)
func (s *NotifyService) ProcessRetryQueue(ctx context.Context, batchSize int) error {
logs, err := s.notifyRepo.ListPendingRetry(ctx, time.Now(), batchSize)
if err != nil {
return err
}
for _, log := range logs {
s.processOne(ctx, log)
}
return nil
}
func (s *NotifyService) processOne(ctx context.Context, log *model.NotifyLog) {
payload, err := s.buildPayload(ctx, log.TradeNo, log.NotifyType)
if err != nil {
slog.ErrorContext(ctx, "build payload failed", "trade_no", log.TradeNo, "err", err)
return
}
resp, err := s.sendHTTP(ctx, log.NotifyURL, payload)
if err == nil && isSuccessResponse(resp) {
s.notifyRepo.MarkSuccess(ctx, log.ID, resp)
slog.InfoContext(ctx, "notify retry success", "trade_no", log.TradeNo, "retry_count", log.RetryCount)
return
}
errMsg := ""
if err != nil {
errMsg = err.Error()
} else {
errMsg = resp
}
nextRetryIdx := log.RetryCount + 1
if nextRetryIdx > maxRetry {
s.notifyRepo.MarkGiveup(ctx, log.ID)
slog.WarnContext(ctx, "notify giveup after max retries", "trade_no", log.TradeNo)
return
}
var nextTime *time.Time
if nextRetryIdx < len(retryIntervals) {
t := time.Now().Add(retryIntervals[nextRetryIdx])
nextTime = &t
}
status := model.NotifyStatusRetry
if nextRetryIdx >= maxRetry {
status = model.NotifyStatusGiveup
}
s.notifyRepo.IncrRetryCount(ctx, log.ID, status, nextTime, errMsg)
}
// buildPayload 构建通知内容
func (s *NotifyService) buildPayload(ctx context.Context, tradeNo string, notifyType model.NotifyType) ([]byte, error) {
order, err := s.tradeRepo.GetByTradeNo(ctx, tradeNo)
if err != nil || order == nil {
return nil, fmt.Errorf("order not found: %s", tradeNo)
}
payload := map[string]any{
"trade_no": order.TradeNo,
"merchant_order_no": order.MerchantOrderNo,
"app_id": order.AppID,
"pay_method": order.PayMethod,
"amount": order.Amount,
"status": order.Status,
"notify_type": notifyType,
"timestamp": time.Now().Unix(),
}
if order.ChannelTradeNo != "" {
payload["channel_trade_no"] = order.ChannelTradeNo
}
if order.PayTime != nil {
payload["pay_time"] = order.PayTime.Unix()
}
return json.Marshal(payload)
}
// sendHTTP 向下游发送 HTTP POST 通知
func (s *NotifyService) sendHTTP(ctx context.Context, notifyURL string, payload []byte) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, notifyURL, bytes.NewReader(payload))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")
resp, err := s.httpClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
return string(body), nil
}
// isSuccessResponse 判断下游是否返回成功
// 下游返回 HTTP 200 且 body 包含 "success" 则视为成功
func isSuccessResponse(body string) bool {
return strings.Contains(strings.ToLower(body), "success")
}
// NextRetryTime 计算下次重试时间
func NextRetryTime(retryCount int) (time.Time, bool) {
idx := retryCount + 1
if idx >= len(retryIntervals) {
return time.Time{}, false
}
return time.Now().Add(retryIntervals[idx]), true
}
// StartPoller 启动通知重试 Poller goroutine
func (s *NotifyService) StartPoller(ctx context.Context, interval time.Duration, batchSize int) {
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := s.ProcessRetryQueue(ctx, batchSize); err != nil {
slog.Error("notify poller error", "err", err)
}
}
}
}()
slog.Info("notify poller started", "interval", interval)
}