refactor: 通用技能按类别拆分为独立目录

skills/ → skills-dev(9), skills-req(10), skills-ops(4),
skills-integration(8), skills-biz(4), skills-workflow(7)

generate-marketplace.py 改为自动扫描所有 skills-* 目录。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-14 11:31:58 +10:30
parent ea266e9cce
commit 712063071c
170 changed files with 341 additions and 346 deletions

View File

@@ -0,0 +1,308 @@
#!/usr/bin/env python3
"""
WPS .dbt 文件迁移到飞书多维表格
"""
import pandas as pd
import requests
import json
import re
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
# ========== 配置 ==========
ZHIYUN_APP_ID = "cli_a9f29dca82b9dbef"
ZHIYUN_APP_SECRET = "sDfhjG7QT1S4gfHiMVYSygmPQPN1R2Ho"
BASE_URL = "https://open.feishu.cn/open-apis"
# 源文件
SOURCE_FILE = "/Users/donglinlai/Downloads/酷采团购系统优化进度表.dbt.xlsx"
class FeishuBitable:
"""飞书多维表格操作工具类"""
def __init__(self):
self._token = None
self._token_expires = None
@property
def token(self) -> str:
if self._token and self._token_expires and datetime.now() < self._token_expires:
return self._token
url = f"{BASE_URL}/auth/v3/tenant_access_token/internal"
response = requests.post(url, json={
"app_id": ZHIYUN_APP_ID,
"app_secret": ZHIYUN_APP_SECRET
})
data = response.json()
if data.get("code") != 0:
raise Exception(f"获取 token 失败: {data}")
self._token = data["tenant_access_token"]
self._token_expires = datetime.now() + timedelta(seconds=data.get("expire", 7200) - 60)
return self._token
@property
def headers(self):
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
def create_bitable(self, name: str) -> Dict:
"""创建多维表格"""
url = f"{BASE_URL}/bitable/v1/apps"
response = requests.post(url, headers=self.headers, json={"name": name})
data = response.json()
if data.get("code") != 0:
raise Exception(f"创建多维表格失败: {data}")
return data["data"]["app"]
def create_table(self, app_token: str, name: str, fields: List[Dict]) -> Dict:
"""创建数据表"""
url = f"{BASE_URL}/bitable/v1/apps/{app_token}/tables"
payload = {
"table": {
"name": name,
"default_view_name": "默认视图",
"fields": fields
}
}
response = requests.post(url, headers=self.headers, json=payload)
data = response.json()
if data.get("code") != 0:
raise Exception(f"创建数据表失败: {data}")
return data["data"]
def batch_create_records(self, app_token: str, table_id: str,
records: List[Dict], batch_size: int = 100) -> int:
"""批量创建记录"""
url = f"{BASE_URL}/bitable/v1/apps/{app_token}/tables/{table_id}/records/batch_create"
total_created = 0
for i in range(0, len(records), batch_size):
batch = records[i:i+batch_size]
payload = {"records": [{"fields": r} for r in batch]}
response = requests.post(url, headers=self.headers, json=payload)
data = response.json()
if data.get("code") != 0:
print(f" [WARN] 批次 {i//batch_size + 1} 部分失败: {data.get('msg', '')}")
# 尝试逐条插入
for record in batch:
try:
single_url = f"{BASE_URL}/bitable/v1/apps/{app_token}/tables/{table_id}/records"
single_resp = requests.post(single_url, headers=self.headers, json={"fields": record})
if single_resp.json().get("code") == 0:
total_created += 1
except:
pass
else:
total_created += len(data["data"]["records"])
return total_created
def analyze_column_type(series: pd.Series, col_name: str) -> Dict:
"""分析列的数据类型,返回飞书字段定义"""
col_lower = col_name.lower()
# 根据列名判断类型
if any(kw in col_lower for kw in ['日期', '时间', 'date', 'time', '提出时间', '发版日期', '更新时间']):
return {"field_name": col_name, "type": 5, "property": {"date_formatter": "yyyy/MM/dd"}}
if any(kw in col_lower for kw in ['图片', '附件', '截图', 'image', 'file', 'attachment']):
return {"field_name": col_name, "type": 1} # 作为文本处理
if any(kw in col_lower for kw in ['优先级', '状态', '类型', '分类', '终端', '严重程度']):
# 提取唯一值作为选项
unique_vals = series.dropna().astype(str).unique()
unique_vals = [v for v in unique_vals if v and v != 'nan' and len(v) < 50][:20]
if len(unique_vals) > 0 and len(unique_vals) <= 20:
return {
"field_name": col_name,
"type": 3, # 单选
"property": {
"options": [{"name": str(v)} for v in unique_vals]
}
}
if any(kw in col_lower for kw in ['进度', '百分比', '%']):
return {"field_name": col_name, "type": 2} # 数字
# 检查是否为数字列
try:
numeric_vals = pd.to_numeric(series.dropna(), errors='coerce')
if numeric_vals.notna().sum() / max(len(series.dropna()), 1) > 0.8:
return {"field_name": col_name, "type": 2} # 数字
except:
pass
# 默认为文本
return {"field_name": col_name, "type": 1}
def clean_value(val: Any, field_type: int) -> Any:
"""清理和转换值"""
if pd.isna(val) or val is None:
return None
if field_type == 5: # 日期
try:
if isinstance(val, (datetime, pd.Timestamp)):
return int(val.timestamp() * 1000)
elif isinstance(val, str):
dt = pd.to_datetime(val)
return int(dt.timestamp() * 1000)
except:
return None
if field_type == 2: # 数字
try:
return float(val)
except:
return None
if field_type == 3: # 单选
val_str = str(val).strip()
if val_str and val_str != 'nan':
return val_str
return None
# 文本类型
val_str = str(val).strip()
if val_str == 'nan' or not val_str:
return None
# 限制文本长度
if len(val_str) > 10000:
val_str = val_str[:10000] + "..."
return val_str
def migrate_sheet(bitable: FeishuBitable, app_token: str,
df: pd.DataFrame, sheet_name: str) -> str:
"""迁移单个 Sheet 到数据表"""
print(f"\n{'='*50}")
print(f"迁移 Sheet: 【{sheet_name}")
print(f"{'='*50}")
# 清理列名
df.columns = [str(c).strip() for c in df.columns]
# 去除完全空的行
df = df.dropna(how='all')
print(f" 数据: {len(df)} 行, {len(df.columns)}")
# 分析字段类型
fields = []
field_types = {}
for col in df.columns:
if not col or col.startswith('Unnamed'):
continue
field_def = analyze_column_type(df[col], col)
fields.append(field_def)
field_types[col] = field_def["type"]
print(f" 字段: {len(fields)}")
# 创建数据表
table_info = bitable.create_table(app_token, sheet_name, fields)
table_id = table_info["table_id"]
print(f" [OK] 数据表创建成功: {table_id}")
# 准备记录数据
records = []
for _, row in df.iterrows():
record = {}
for col in df.columns:
if not col or col.startswith('Unnamed'):
continue
val = clean_value(row[col], field_types.get(col, 1))
if val is not None:
record[col] = val
if record: # 只添加非空记录
records.append(record)
print(f" 准备导入 {len(records)} 条记录...")
# 批量创建记录
if records:
created = bitable.batch_create_records(app_token, table_id, records)
print(f" [OK] 成功导入 {created}/{len(records)} 条记录")
else:
print(f" [INFO] 无有效数据")
return table_id
def main():
print("\n" + "#" * 60)
print("# WPS 文件迁移到飞书多维表格")
print("#" * 60)
bitable = FeishuBitable()
# Step 1: 读取源文件
print("\n" + "=" * 50)
print("Step 1: 读取源文件")
print("=" * 50)
xlsx = pd.ExcelFile(SOURCE_FILE)
print(f" 文件: {SOURCE_FILE}")
print(f" Sheet 数量: {len(xlsx.sheet_names)}")
# Step 2: 创建多维表格
print("\n" + "=" * 50)
print("Step 2: 创建飞书多维表格")
print("=" * 50)
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
bitable_name = f"酷采团购系统优化进度表 (迁移 {timestamp})"
app_info = bitable.create_bitable(bitable_name)
app_token = app_info["app_token"]
print(f" [OK] 多维表格创建成功")
print(f" 名称: {bitable_name}")
print(f" app_token: {app_token}")
# Step 3: 迁移每个 Sheet
print("\n" + "=" * 50)
print("Step 3: 迁移数据表")
print("=" * 50)
table_ids = {}
for sheet_name in xlsx.sheet_names:
df = pd.read_excel(xlsx, sheet_name=sheet_name)
table_id = migrate_sheet(bitable, app_token, df, sheet_name)
table_ids[sheet_name] = table_id
# 完成
print("\n" + "=" * 60)
print("迁移完成!")
print("=" * 60)
print(f"\n多维表格信息:")
print(f" 名称: {bitable_name}")
print(f" app_token: {app_token}")
print(f"\n数据表:")
for name, tid in table_ids.items():
print(f" - {name}: {tid}")
print(f"\n访问地址:")
print(f" https://zhiyuncai.feishu.cn/base/{app_token}")
print()
return app_token, table_ids
if __name__ == "__main__":
main()