309 lines
9.6 KiB
Python
309 lines
9.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
WPS .dbt 文件迁移到飞书多维表格
|
|
"""
|
|
|
|
import pandas as pd
|
|
import requests
|
|
import json
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
# ========== 配置 ==========
|
|
ZHIYUN_APP_ID = "cli_a9f29dca82b9dbef"
|
|
ZHIYUN_APP_SECRET = "sDfhjG7QT1S4gfHiMVYSygmPQPN1R2Ho"
|
|
BASE_URL = "https://open.feishu.cn/open-apis"
|
|
|
|
# 源文件
|
|
SOURCE_FILE = "/Users/donglinlai/Downloads/酷采团购系统优化进度表.dbt.xlsx"
|
|
|
|
|
|
class FeishuBitable:
|
|
"""飞书多维表格操作工具类"""
|
|
|
|
def __init__(self):
|
|
self._token = None
|
|
self._token_expires = None
|
|
|
|
@property
|
|
def token(self) -> str:
|
|
if self._token and self._token_expires and datetime.now() < self._token_expires:
|
|
return self._token
|
|
|
|
url = f"{BASE_URL}/auth/v3/tenant_access_token/internal"
|
|
response = requests.post(url, json={
|
|
"app_id": ZHIYUN_APP_ID,
|
|
"app_secret": ZHIYUN_APP_SECRET
|
|
})
|
|
data = response.json()
|
|
|
|
if data.get("code") != 0:
|
|
raise Exception(f"获取 token 失败: {data}")
|
|
|
|
self._token = data["tenant_access_token"]
|
|
self._token_expires = datetime.now() + timedelta(seconds=data.get("expire", 7200) - 60)
|
|
return self._token
|
|
|
|
@property
|
|
def headers(self):
|
|
return {
|
|
"Authorization": f"Bearer {self.token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
def create_bitable(self, name: str) -> Dict:
|
|
"""创建多维表格"""
|
|
url = f"{BASE_URL}/bitable/v1/apps"
|
|
response = requests.post(url, headers=self.headers, json={"name": name})
|
|
data = response.json()
|
|
|
|
if data.get("code") != 0:
|
|
raise Exception(f"创建多维表格失败: {data}")
|
|
|
|
return data["data"]["app"]
|
|
|
|
def create_table(self, app_token: str, name: str, fields: List[Dict]) -> Dict:
|
|
"""创建数据表"""
|
|
url = f"{BASE_URL}/bitable/v1/apps/{app_token}/tables"
|
|
payload = {
|
|
"table": {
|
|
"name": name,
|
|
"default_view_name": "默认视图",
|
|
"fields": fields
|
|
}
|
|
}
|
|
response = requests.post(url, headers=self.headers, json=payload)
|
|
data = response.json()
|
|
|
|
if data.get("code") != 0:
|
|
raise Exception(f"创建数据表失败: {data}")
|
|
|
|
return data["data"]
|
|
|
|
def batch_create_records(self, app_token: str, table_id: str,
|
|
records: List[Dict], batch_size: int = 100) -> int:
|
|
"""批量创建记录"""
|
|
url = f"{BASE_URL}/bitable/v1/apps/{app_token}/tables/{table_id}/records/batch_create"
|
|
total_created = 0
|
|
|
|
for i in range(0, len(records), batch_size):
|
|
batch = records[i:i+batch_size]
|
|
payload = {"records": [{"fields": r} for r in batch]}
|
|
|
|
response = requests.post(url, headers=self.headers, json=payload)
|
|
data = response.json()
|
|
|
|
if data.get("code") != 0:
|
|
print(f" [WARN] 批次 {i//batch_size + 1} 部分失败: {data.get('msg', '')}")
|
|
# 尝试逐条插入
|
|
for record in batch:
|
|
try:
|
|
single_url = f"{BASE_URL}/bitable/v1/apps/{app_token}/tables/{table_id}/records"
|
|
single_resp = requests.post(single_url, headers=self.headers, json={"fields": record})
|
|
if single_resp.json().get("code") == 0:
|
|
total_created += 1
|
|
except:
|
|
pass
|
|
else:
|
|
total_created += len(data["data"]["records"])
|
|
|
|
return total_created
|
|
|
|
|
|
def analyze_column_type(series: pd.Series, col_name: str) -> Dict:
|
|
"""分析列的数据类型,返回飞书字段定义"""
|
|
col_lower = col_name.lower()
|
|
|
|
# 根据列名判断类型
|
|
if any(kw in col_lower for kw in ['日期', '时间', 'date', 'time', '提出时间', '发版日期', '更新时间']):
|
|
return {"field_name": col_name, "type": 5, "property": {"date_formatter": "yyyy/MM/dd"}}
|
|
|
|
if any(kw in col_lower for kw in ['图片', '附件', '截图', 'image', 'file', 'attachment']):
|
|
return {"field_name": col_name, "type": 1} # 作为文本处理
|
|
|
|
if any(kw in col_lower for kw in ['优先级', '状态', '类型', '分类', '终端', '严重程度']):
|
|
# 提取唯一值作为选项
|
|
unique_vals = series.dropna().astype(str).unique()
|
|
unique_vals = [v for v in unique_vals if v and v != 'nan' and len(v) < 50][:20]
|
|
if len(unique_vals) > 0 and len(unique_vals) <= 20:
|
|
return {
|
|
"field_name": col_name,
|
|
"type": 3, # 单选
|
|
"property": {
|
|
"options": [{"name": str(v)} for v in unique_vals]
|
|
}
|
|
}
|
|
|
|
if any(kw in col_lower for kw in ['进度', '百分比', '%']):
|
|
return {"field_name": col_name, "type": 2} # 数字
|
|
|
|
# 检查是否为数字列
|
|
try:
|
|
numeric_vals = pd.to_numeric(series.dropna(), errors='coerce')
|
|
if numeric_vals.notna().sum() / max(len(series.dropna()), 1) > 0.8:
|
|
return {"field_name": col_name, "type": 2} # 数字
|
|
except:
|
|
pass
|
|
|
|
# 默认为文本
|
|
return {"field_name": col_name, "type": 1}
|
|
|
|
|
|
def clean_value(val: Any, field_type: int) -> Any:
|
|
"""清理和转换值"""
|
|
if pd.isna(val) or val is None:
|
|
return None
|
|
|
|
if field_type == 5: # 日期
|
|
try:
|
|
if isinstance(val, (datetime, pd.Timestamp)):
|
|
return int(val.timestamp() * 1000)
|
|
elif isinstance(val, str):
|
|
dt = pd.to_datetime(val)
|
|
return int(dt.timestamp() * 1000)
|
|
except:
|
|
return None
|
|
|
|
if field_type == 2: # 数字
|
|
try:
|
|
return float(val)
|
|
except:
|
|
return None
|
|
|
|
if field_type == 3: # 单选
|
|
val_str = str(val).strip()
|
|
if val_str and val_str != 'nan':
|
|
return val_str
|
|
return None
|
|
|
|
# 文本类型
|
|
val_str = str(val).strip()
|
|
if val_str == 'nan' or not val_str:
|
|
return None
|
|
|
|
# 限制文本长度
|
|
if len(val_str) > 10000:
|
|
val_str = val_str[:10000] + "..."
|
|
|
|
return val_str
|
|
|
|
|
|
def migrate_sheet(bitable: FeishuBitable, app_token: str,
|
|
df: pd.DataFrame, sheet_name: str) -> str:
|
|
"""迁移单个 Sheet 到数据表"""
|
|
print(f"\n{'='*50}")
|
|
print(f"迁移 Sheet: 【{sheet_name}】")
|
|
print(f"{'='*50}")
|
|
|
|
# 清理列名
|
|
df.columns = [str(c).strip() for c in df.columns]
|
|
|
|
# 去除完全空的行
|
|
df = df.dropna(how='all')
|
|
|
|
print(f" 数据: {len(df)} 行, {len(df.columns)} 列")
|
|
|
|
# 分析字段类型
|
|
fields = []
|
|
field_types = {}
|
|
for col in df.columns:
|
|
if not col or col.startswith('Unnamed'):
|
|
continue
|
|
field_def = analyze_column_type(df[col], col)
|
|
fields.append(field_def)
|
|
field_types[col] = field_def["type"]
|
|
|
|
print(f" 字段: {len(fields)} 个")
|
|
|
|
# 创建数据表
|
|
table_info = bitable.create_table(app_token, sheet_name, fields)
|
|
table_id = table_info["table_id"]
|
|
print(f" [OK] 数据表创建成功: {table_id}")
|
|
|
|
# 准备记录数据
|
|
records = []
|
|
for _, row in df.iterrows():
|
|
record = {}
|
|
for col in df.columns:
|
|
if not col or col.startswith('Unnamed'):
|
|
continue
|
|
val = clean_value(row[col], field_types.get(col, 1))
|
|
if val is not None:
|
|
record[col] = val
|
|
if record: # 只添加非空记录
|
|
records.append(record)
|
|
|
|
print(f" 准备导入 {len(records)} 条记录...")
|
|
|
|
# 批量创建记录
|
|
if records:
|
|
created = bitable.batch_create_records(app_token, table_id, records)
|
|
print(f" [OK] 成功导入 {created}/{len(records)} 条记录")
|
|
else:
|
|
print(f" [INFO] 无有效数据")
|
|
|
|
return table_id
|
|
|
|
|
|
def main():
|
|
print("\n" + "#" * 60)
|
|
print("# WPS 文件迁移到飞书多维表格")
|
|
print("#" * 60)
|
|
|
|
bitable = FeishuBitable()
|
|
|
|
# Step 1: 读取源文件
|
|
print("\n" + "=" * 50)
|
|
print("Step 1: 读取源文件")
|
|
print("=" * 50)
|
|
|
|
xlsx = pd.ExcelFile(SOURCE_FILE)
|
|
print(f" 文件: {SOURCE_FILE}")
|
|
print(f" Sheet 数量: {len(xlsx.sheet_names)}")
|
|
|
|
# Step 2: 创建多维表格
|
|
print("\n" + "=" * 50)
|
|
print("Step 2: 创建飞书多维表格")
|
|
print("=" * 50)
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
|
|
bitable_name = f"酷采团购系统优化进度表 (迁移 {timestamp})"
|
|
|
|
app_info = bitable.create_bitable(bitable_name)
|
|
app_token = app_info["app_token"]
|
|
print(f" [OK] 多维表格创建成功")
|
|
print(f" 名称: {bitable_name}")
|
|
print(f" app_token: {app_token}")
|
|
|
|
# Step 3: 迁移每个 Sheet
|
|
print("\n" + "=" * 50)
|
|
print("Step 3: 迁移数据表")
|
|
print("=" * 50)
|
|
|
|
table_ids = {}
|
|
for sheet_name in xlsx.sheet_names:
|
|
df = pd.read_excel(xlsx, sheet_name=sheet_name)
|
|
table_id = migrate_sheet(bitable, app_token, df, sheet_name)
|
|
table_ids[sheet_name] = table_id
|
|
|
|
# 完成
|
|
print("\n" + "=" * 60)
|
|
print("迁移完成!")
|
|
print("=" * 60)
|
|
print(f"\n多维表格信息:")
|
|
print(f" 名称: {bitable_name}")
|
|
print(f" app_token: {app_token}")
|
|
print(f"\n数据表:")
|
|
for name, tid in table_ids.items():
|
|
print(f" - {name}: {tid}")
|
|
print(f"\n访问地址:")
|
|
print(f" https://zhiyuncai.feishu.cn/base/{app_token}")
|
|
print()
|
|
|
|
return app_token, table_ids
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|