Files
ai-proj-helper/skills-integration/feishu-plugin/add_images.py
John Qiu 712063071c refactor: 通用技能按类别拆分为独立目录
skills/ → skills-dev(9), skills-req(10), skills-ops(4),
skills-integration(8), skills-biz(4), skills-workflow(7)

generate-marketplace.py 改为自动扫描所有 skills-* 目录。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-14 11:31:58 +10:30

323 lines
10 KiB
Python

#!/usr/bin/env python3
"""
添加需求图片字段并上传图片到飞书多维表格
"""
import requests
import os
from datetime import datetime, timedelta
from PIL import Image, ImageDraw, ImageFont
# ========== 配置 ==========
ZHIYUN_APP_ID = "cli_a9f29dca82b9dbef"
ZHIYUN_APP_SECRET = "sDfhjG7QT1S4gfHiMVYSygmPQPN1R2Ho"
BASE_URL = "https://open.feishu.cn/open-apis"
# Demo 创建的多维表格信息
APP_TOKEN = "D6PQbxf4aald77sPjDTciYbenjc"
TABLE_ID = "tblX3YbGrXm8pmLR"
class FeishuBitable:
"""飞书多维表格操作工具类"""
def __init__(self, app_id: str = ZHIYUN_APP_ID, app_secret: str = ZHIYUN_APP_SECRET):
self.app_id = app_id
self.app_secret = app_secret
self._token = None
self._token_expires = None
@property
def token(self) -> str:
if self._token and self._token_expires and datetime.now() < self._token_expires:
return self._token
url = f"{BASE_URL}/auth/v3/tenant_access_token/internal"
response = requests.post(url, json={
"app_id": self.app_id,
"app_secret": self.app_secret
})
data = response.json()
if data.get("code") != 0:
raise Exception(f"获取 token 失败: {data}")
self._token = data["tenant_access_token"]
self._token_expires = datetime.now() + timedelta(seconds=data.get("expire", 7200) - 60)
return self._token
@property
def headers(self):
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
def create_field(self, app_token: str, table_id: str, field_name: str, field_type: int, property: dict = None):
"""创建新字段"""
url = f"{BASE_URL}/bitable/v1/apps/{app_token}/tables/{table_id}/fields"
payload = {
"field_name": field_name,
"type": field_type
}
if property:
payload["property"] = property
response = requests.post(url, headers=self.headers, json=payload)
data = response.json()
if data.get("code") != 0:
raise Exception(f"创建字段失败: {data}")
print(f"[OK] 字段 '{field_name}' 创建成功")
return data["data"]["field"]
def upload_media(self, app_token: str, file_path: str, file_name: str = None):
"""上传附件到多维表格"""
url = f"{BASE_URL}/drive/v1/medias/upload_all"
if file_name is None:
file_name = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
headers = {
"Authorization": f"Bearer {self.token}"
}
with open(file_path, 'rb') as f:
files = {
'file': (file_name, f, 'image/png')
}
data = {
'file_name': file_name,
'parent_type': 'bitable_image',
'parent_node': app_token,
'size': str(file_size)
}
response = requests.post(url, headers=headers, files=files, data=data)
result = response.json()
if result.get("code") != 0:
raise Exception(f"上传失败: {result}")
file_token = result["data"]["file_token"]
print(f"[OK] 文件 '{file_name}' 上传成功, file_token: {file_token}")
return file_token
def list_records(self, app_token: str, table_id: str, filter_str: str = None):
"""列出记录"""
url = f"{BASE_URL}/bitable/v1/apps/{app_token}/tables/{table_id}/records"
params = {"page_size": 100}
if filter_str:
params["filter"] = filter_str
response = requests.get(url, headers=self.headers, params=params)
data = response.json()
if data.get("code") != 0:
raise Exception(f"查询失败: {data}")
return data["data"].get("items", [])
def update_record(self, app_token: str, table_id: str, record_id: str, fields: dict):
"""更新记录"""
url = f"{BASE_URL}/bitable/v1/apps/{app_token}/tables/{table_id}/records/{record_id}"
response = requests.put(url, headers=self.headers, json={"fields": fields})
data = response.json()
if data.get("code") != 0:
raise Exception(f"更新失败: {data}")
print(f"[OK] 记录 {record_id} 更新成功")
return data["data"]["record"]
def generate_sample_images(output_dir: str):
"""生成2张示例需求图片"""
os.makedirs(output_dir, exist_ok=True)
images = []
# 图片1: 用户流程图
img1 = Image.new('RGB', (800, 600), color='#f0f4f8')
draw1 = ImageDraw.Draw(img1)
# 绘制标题
draw1.rectangle([50, 30, 750, 80], fill='#4a90d9', outline='#2563eb')
draw1.text((400, 55), "用户登录流程图", fill='white', anchor='mm')
# 绘制流程框
boxes = [
(150, 150, "打开登录页"),
(400, 150, "输入账号密码"),
(650, 150, "点击登录"),
(150, 300, "验证账号"),
(400, 300, "生成Token"),
(650, 300, "跳转首页"),
]
for x, y, text in boxes:
draw1.rectangle([x-60, y-25, x+60, y+25], fill='#e8f4fd', outline='#4a90d9', width=2)
draw1.text((x, y), text, fill='#1e3a5f', anchor='mm')
# 绘制箭头线
arrows = [
(210, 150, 340, 150),
(460, 150, 590, 150),
(650, 175, 650, 275),
(590, 300, 460, 300),
(340, 300, 210, 300),
]
for x1, y1, x2, y2 in arrows:
draw1.line([(x1, y1), (x2, y2)], fill='#4a90d9', width=2)
# 添加水印
draw1.text((400, 550), "Claude Code Demo - 需求图片1", fill='#94a3b8', anchor='mm')
img1_path = os.path.join(output_dir, "requirement_flow.png")
img1.save(img1_path)
images.append(img1_path)
print(f"[OK] 生成图片: {img1_path}")
# 图片2: 界面原型图
img2 = Image.new('RGB', (800, 600), color='#ffffff')
draw2 = ImageDraw.Draw(img2)
# 绘制浏览器框架
draw2.rectangle([50, 30, 750, 570], outline='#d1d5db', width=2)
draw2.rectangle([50, 30, 750, 70], fill='#f3f4f6', outline='#d1d5db')
# 浏览器按钮
draw2.ellipse([70, 42, 86, 58], fill='#ef4444')
draw2.ellipse([95, 42, 111, 58], fill='#eab308')
draw2.ellipse([120, 42, 136, 58], fill='#22c55e')
# 地址栏
draw2.rectangle([160, 42, 600, 58], fill='white', outline='#d1d5db')
draw2.text((170, 50), "https://example.com/login", fill='#6b7280', anchor='lm')
# 登录表单区域
draw2.rectangle([200, 120, 600, 500], fill='#f8fafc', outline='#e2e8f0', width=1)
# Logo 占位
draw2.ellipse([350, 140, 450, 200], fill='#4a90d9')
draw2.text((400, 170), "LOGO", fill='white', anchor='mm')
# 标题
draw2.text((400, 230), "欢迎登录", fill='#1e293b', anchor='mm')
# 输入框
draw2.rectangle([250, 270, 550, 310], fill='white', outline='#cbd5e1')
draw2.text((260, 290), "请输入用户名", fill='#94a3b8', anchor='lm')
draw2.rectangle([250, 330, 550, 370], fill='white', outline='#cbd5e1')
draw2.text((260, 350), "请输入密码", fill='#94a3b8', anchor='lm')
# 登录按钮
draw2.rectangle([250, 400, 550, 450], fill='#4a90d9', outline='#2563eb')
draw2.text((400, 425), "登 录", fill='white', anchor='mm')
# 底部链接
draw2.text((320, 480), "忘记密码", fill='#4a90d9', anchor='mm')
draw2.text((480, 480), "注册账号", fill='#4a90d9', anchor='mm')
# 水印
draw2.text((400, 550), "Claude Code Demo - 需求图片2", fill='#94a3b8', anchor='mm')
img2_path = os.path.join(output_dir, "requirement_ui.png")
img2.save(img2_path)
images.append(img2_path)
print(f"[OK] 生成图片: {img2_path}")
return images
def main():
print("\n" + "#" * 60)
print("# 添加需求图片到多维表格")
print("#" * 60)
bitable = FeishuBitable()
# Step 1: 生成示例图片
print("\n" + "=" * 50)
print("Step 1: 生成示例图片")
print("=" * 50)
output_dir = "/tmp/feishu_demo_images"
image_paths = generate_sample_images(output_dir)
# Step 2: 添加"需求图片"字段
print("\n" + "=" * 50)
print("Step 2: 添加「需求图片」字段")
print("=" * 50)
try:
bitable.create_field(
APP_TOKEN,
TABLE_ID,
"需求图片",
17 # 17 = 附件类型
)
except Exception as e:
if "FieldNameExist" in str(e) or "FieldNameDuplicated" in str(e) or "1254043" in str(e) or "1254014" in str(e):
print("[INFO] 字段「需求图片」已存在,跳过创建")
else:
raise
# Step 3: 上传图片
print("\n" + "=" * 50)
print("Step 3: 上传图片到飞书")
print("=" * 50)
file_tokens = []
for img_path in image_paths:
file_token = bitable.upload_media(APP_TOKEN, img_path)
file_tokens.append(file_token)
# Step 4: 查找"完成产品需求文档"记录
print("\n" + "=" * 50)
print("Step 4: 查找目标记录")
print("=" * 50)
records = bitable.list_records(APP_TOKEN, TABLE_ID)
target_record = None
for record in records:
if record["fields"].get("任务名称") == "完成产品需求文档":
target_record = record
break
if not target_record:
raise Exception("未找到「完成产品需求文档」记录")
print(f"[OK] 找到记录: {target_record['record_id']}")
print(f" 任务名称: {target_record['fields'].get('任务名称')}")
# Step 5: 更新记录,添加图片
print("\n" + "=" * 50)
print("Step 5: 更新记录,添加图片附件")
print("=" * 50)
# 构造附件字段值
attachments = [{"file_token": ft} for ft in file_tokens]
bitable.update_record(
APP_TOKEN,
TABLE_ID,
target_record["record_id"],
{"需求图片": attachments}
)
# 完成
print("\n" + "=" * 60)
print("完成!")
print("=" * 60)
print(f"\n已添加 {len(file_tokens)} 张图片到「完成产品需求文档」记录")
print(f"\n访问地址查看效果:")
print(f" https://zhiyuncai.feishu.cn/base/{APP_TOKEN}?table={TABLE_ID}")
print()
if __name__ == "__main__":
main()