draft
This commit is contained in:
229
backend/internal/service/notify.go
Normal file
229
backend/internal/service/notify.go
Normal file
@@ -0,0 +1,229 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"pay-bridge/internal/model"
|
||||
"pay-bridge/internal/repository"
|
||||
)
|
||||
|
||||
// 重试间隔:9 次推送机会(第1次立即,后续8次重试)
|
||||
var retryIntervals = []time.Duration{
|
||||
0,
|
||||
15 * time.Second,
|
||||
30 * time.Second,
|
||||
1 * time.Minute,
|
||||
5 * time.Minute,
|
||||
30 * time.Minute,
|
||||
1 * time.Hour,
|
||||
6 * time.Hour,
|
||||
12 * time.Hour,
|
||||
}
|
||||
|
||||
const maxRetry = 8
|
||||
|
||||
// NotifyService 通知服务
|
||||
type NotifyService struct {
|
||||
notifyRepo *repository.NotifyLogRepository
|
||||
tradeRepo *repository.TradeOrderRepository
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
func NewNotifyService(
|
||||
notifyRepo *repository.NotifyLogRepository,
|
||||
tradeRepo *repository.TradeOrderRepository,
|
||||
httpTimeout time.Duration,
|
||||
) *NotifyService {
|
||||
return &NotifyService{
|
||||
notifyRepo: notifyRepo,
|
||||
tradeRepo: tradeRepo,
|
||||
httpClient: &http.Client{Timeout: httpTimeout},
|
||||
}
|
||||
}
|
||||
|
||||
// SendNotify 向下游发送通知(首次调用)
|
||||
func (s *NotifyService) SendNotify(ctx context.Context, tradeNo string, notifyType model.NotifyType, notifyURL string) error {
|
||||
// 构建通知内容
|
||||
payload, err := s.buildPayload(ctx, tradeNo, notifyType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 创建通知记录
|
||||
now := time.Now()
|
||||
log := &model.NotifyLog{
|
||||
TradeNo: tradeNo,
|
||||
NotifyType: notifyType,
|
||||
NotifyURL: notifyURL,
|
||||
Status: model.NotifyStatusPending,
|
||||
RetryCount: 0,
|
||||
}
|
||||
if err := s.notifyRepo.Upsert(ctx, log); err != nil {
|
||||
slog.ErrorContext(ctx, "upsert notify log failed", "trade_no", tradeNo, "err", err)
|
||||
}
|
||||
|
||||
// 发送通知
|
||||
resp, err := s.sendHTTP(ctx, notifyURL, payload)
|
||||
if err == nil && isSuccessResponse(resp) {
|
||||
s.notifyRepo.MarkSuccess(ctx, log.ID, resp)
|
||||
slog.InfoContext(ctx, "notify success", "trade_no", tradeNo, "type", notifyType)
|
||||
return nil
|
||||
}
|
||||
|
||||
// 首次失败,写入重试队列
|
||||
errMsg := ""
|
||||
if err != nil {
|
||||
errMsg = err.Error()
|
||||
} else {
|
||||
errMsg = resp
|
||||
}
|
||||
|
||||
nextTime := now.Add(retryIntervals[1])
|
||||
s.notifyRepo.IncrRetryCount(ctx, log.ID, model.NotifyStatusRetry, &nextTime, errMsg)
|
||||
slog.WarnContext(ctx, "notify failed, scheduled retry", "trade_no", tradeNo, "next_retry", nextTime)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ProcessRetryQueue 处理重试队列(由 Poller 调用)
|
||||
func (s *NotifyService) ProcessRetryQueue(ctx context.Context, batchSize int) error {
|
||||
logs, err := s.notifyRepo.ListPendingRetry(ctx, time.Now(), batchSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, log := range logs {
|
||||
s.processOne(ctx, log)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *NotifyService) processOne(ctx context.Context, log *model.NotifyLog) {
|
||||
payload, err := s.buildPayload(ctx, log.TradeNo, log.NotifyType)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "build payload failed", "trade_no", log.TradeNo, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := s.sendHTTP(ctx, log.NotifyURL, payload)
|
||||
if err == nil && isSuccessResponse(resp) {
|
||||
s.notifyRepo.MarkSuccess(ctx, log.ID, resp)
|
||||
slog.InfoContext(ctx, "notify retry success", "trade_no", log.TradeNo, "retry_count", log.RetryCount)
|
||||
return
|
||||
}
|
||||
|
||||
errMsg := ""
|
||||
if err != nil {
|
||||
errMsg = err.Error()
|
||||
} else {
|
||||
errMsg = resp
|
||||
}
|
||||
|
||||
nextRetryIdx := log.RetryCount + 1
|
||||
if nextRetryIdx > maxRetry {
|
||||
s.notifyRepo.MarkGiveup(ctx, log.ID)
|
||||
slog.WarnContext(ctx, "notify giveup after max retries", "trade_no", log.TradeNo)
|
||||
return
|
||||
}
|
||||
|
||||
var nextTime *time.Time
|
||||
if nextRetryIdx < len(retryIntervals) {
|
||||
t := time.Now().Add(retryIntervals[nextRetryIdx])
|
||||
nextTime = &t
|
||||
}
|
||||
|
||||
status := model.NotifyStatusRetry
|
||||
if nextRetryIdx >= maxRetry {
|
||||
status = model.NotifyStatusGiveup
|
||||
}
|
||||
|
||||
s.notifyRepo.IncrRetryCount(ctx, log.ID, status, nextTime, errMsg)
|
||||
}
|
||||
|
||||
// buildPayload 构建通知内容
|
||||
func (s *NotifyService) buildPayload(ctx context.Context, tradeNo string, notifyType model.NotifyType) ([]byte, error) {
|
||||
order, err := s.tradeRepo.GetByTradeNo(ctx, tradeNo)
|
||||
if err != nil || order == nil {
|
||||
return nil, fmt.Errorf("order not found: %s", tradeNo)
|
||||
}
|
||||
|
||||
payload := map[string]any{
|
||||
"trade_no": order.TradeNo,
|
||||
"merchant_order_no": order.MerchantOrderNo,
|
||||
"app_id": order.AppID,
|
||||
"pay_method": order.PayMethod,
|
||||
"amount": order.Amount,
|
||||
"status": order.Status,
|
||||
"notify_type": notifyType,
|
||||
"timestamp": time.Now().Unix(),
|
||||
}
|
||||
|
||||
if order.ChannelTradeNo != "" {
|
||||
payload["channel_trade_no"] = order.ChannelTradeNo
|
||||
}
|
||||
if order.PayTime != nil {
|
||||
payload["pay_time"] = order.PayTime.Unix()
|
||||
}
|
||||
|
||||
return json.Marshal(payload)
|
||||
}
|
||||
|
||||
// sendHTTP 向下游发送 HTTP POST 通知
|
||||
func (s *NotifyService) sendHTTP(ctx context.Context, notifyURL string, payload []byte) (string, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, notifyURL, bytes.NewReader(payload))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := s.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
|
||||
return string(body), nil
|
||||
}
|
||||
|
||||
// isSuccessResponse 判断下游是否返回成功
|
||||
// 下游返回 HTTP 200 且 body 包含 "success" 则视为成功
|
||||
func isSuccessResponse(body string) bool {
|
||||
return strings.Contains(strings.ToLower(body), "success")
|
||||
}
|
||||
|
||||
// NextRetryTime 计算下次重试时间
|
||||
func NextRetryTime(retryCount int) (time.Time, bool) {
|
||||
idx := retryCount + 1
|
||||
if idx >= len(retryIntervals) {
|
||||
return time.Time{}, false
|
||||
}
|
||||
return time.Now().Add(retryIntervals[idx]), true
|
||||
}
|
||||
|
||||
// StartPoller 启动通知重试 Poller goroutine
|
||||
func (s *NotifyService) StartPoller(ctx context.Context, interval time.Duration, batchSize int) {
|
||||
go func() {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := s.ProcessRetryQueue(ctx, batchSize); err != nil {
|
||||
slog.Error("notify poller error", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
slog.Info("notify poller started", "interval", interval)
|
||||
}
|
||||
Reference in New Issue
Block a user