add ai image base

This commit is contained in:
XIE7654
2025-07-21 12:24:11 +08:00
parent 3131480afa
commit 816668530c
12 changed files with 449 additions and 2 deletions

View File

@@ -0,0 +1,50 @@
import os
from enum import Enum
from pydantic import SecretStr
class ProviderEnum(str, Enum):
"""支持的 LLM 服务商"""
DEEPSEEK = "deepseek"
OPENAI = "openai"
TONGYI = "tongyi"
class LLMFactory(object):
@staticmethod
def get_llm(provider: ProviderEnum, model: str = None, **kwargs):
if provider == ProviderEnum.DEEPSEEK:
from langchain_deepseek import ChatDeepSeek
api_key = os.getenv("DEEPSEEK_API_KEY")
model = model or "deepseek-chat"
return ChatDeepSeek(
api_key=SecretStr(api_key),
model=model,
streaming=True,
**kwargs
)
elif provider == ProviderEnum.OPENAI:
from langchain_openai import ChatOpenAI
api_key = os.getenv("OPENAI_API_KEY")
model = model or "gpt-3.5-turbo"
return ChatOpenAI(
api_key=SecretStr(api_key),
model=model,
streaming=True,
**kwargs
)
elif provider == ProviderEnum.TONGYI:
from langchain_community.llms import Tongyi
api_key = os.getenv("DASHSCOPE_API_KEY")
model = model or "qwen-turbo"
return Tongyi(
api_key=SecretStr(api_key),
model=model,
streaming=True,
**kwargs
)
else:
raise ValueError(f"不支持的 LLM 服务商: {provider}")

View File

View File

@@ -0,0 +1,17 @@
from langchain_deepseek import ChatDeepSeek
from llm.base import MultiModalAICapability
class DeepSeekAdapter(MultiModalAICapability):
def __init__(self, api_key, model, **kwargs):
self.llm = ChatDeepSeek(api_key=api_key, model=model, streaming=True)
async def chat(self, messages, **kwargs):
# 兼容 DeepSeek 的调用方式
return await self.llm.ainvoke(messages)
async def stream_chat(self, messages, **kwargs):
async for chunk in self.llm.astream(messages):
yield chunk

View File

@@ -0,0 +1,21 @@
# 假设有 google genai sdk
# from google_genai import GenAI
from llm.base import MultiModalAICapability
class GoogleGenAIAdapter(MultiModalAICapability):
def __init__(self, api_key, model, **kwargs):
self.api_key = api_key
self.model = model
# self.llm = GenAI(api_key=api_key, model=model)
async def chat(self, messages, **kwargs):
# return await self.llm.chat(messages)
raise NotImplementedError("Google GenAI chat未实现")
async def stream_chat(self, messages, **kwargs):
# async for chunk in self.llm.stream_chat(messages):
# yield chunk
raise NotImplementedError("Google GenAI stream_chat未实现")
# 其他能力同理

View File

@@ -0,0 +1,25 @@
from llm.base import MultiModalAICapability
from langchain_openai import ChatOpenAI
# from openai import OpenAI # 如需图片/音频/视频等API
class OpenAIAdapter(MultiModalAICapability):
def __init__(self, api_key, model, **kwargs):
self.llm = ChatOpenAI(api_key=api_key, model=model, streaming=True)
self.api_key = api_key
async def chat(self, messages, **kwargs):
return await self.llm.ainvoke(messages)
async def stream_chat(self, messages, **kwargs):
async for chunk in self.llm.astream(messages):
yield chunk
# 如需图片生成DALL·E可实现如下
def create_image_task(self, prompt, **kwargs):
# 伪代码,需用 openai.Image.create
# import openai
# response = openai.Image.create(api_key=self.api_key, prompt=prompt, ...)
# return response
raise NotImplementedError("OpenAI 图片生成请用 openai.Image.create 实现")
# 其他能力同理

View File

@@ -0,0 +1,51 @@
from langchain_deepseek import ChatDeepSeek
from http import HTTPStatus
from urllib.parse import urlparse, unquote
from pathlib import PurePosixPath
import requests
from dashscope import ImageSynthesis
import os
from llm.base import MultiModalAICapability
class TongYiAdapter(MultiModalAICapability):
def __init__(self, api_key, model, **kwargs):
self.api_key = api_key
self.model = model
self.llm = ChatDeepSeek(api_key=api_key, model=model, streaming=True)
async def chat(self, messages, **kwargs):
# 兼容 DeepSeek 的调用方式
return await self.llm.ainvoke(messages)
async def stream_chat(self, messages, **kwargs):
async for chunk in self.llm.astream(messages):
yield chunk
@staticmethod
def create_image_task(api_key, model, prompt: str, style='<watercolor>', size='1024*1024', n=1):
"""创建异步图片生成任务"""
rsp = ImageSynthesis.async_call(
api_key=api_key,
model=model,
prompt=prompt,
n=n,
style=style,
size=size
)
if rsp.status_code == HTTPStatus.OK:
return rsp
else:
raise Exception(f"Failed, status_code: {rsp.status_code}, code: {rsp.code}, message: {rsp.message}")
@staticmethod
def fetch_image_task_status(task):
"""获取异步图片任务状态"""
status = ImageSynthesis.fetch(task)
if status.status_code == HTTPStatus.OK:
return status.output.task_status
else:
raise Exception(f"Failed, status_code: {status.status_code}, code: {status.code}, message: {status.message}")

37
ai_service/llm/base.py Normal file
View File

@@ -0,0 +1,37 @@
from abc import ABC
class MultiModalAICapability(ABC):
# 对话能力
async def chat(self, messages, **kwargs):
raise NotImplementedError("chat not supported by this provider")
async def stream_chat(self, messages, **kwargs):
raise NotImplementedError("stream_chat not supported by this provider")
# 图片生成能力
def create_image_task(self, prompt, **kwargs):
raise NotImplementedError("image generation not supported by this provider")
def fetch_image_task_status(self, task):
raise NotImplementedError("image task status not supported by this provider")
def fetch_image_result(self, task):
raise NotImplementedError("image result not supported by this provider")
# 视频生成能力
def create_video_task(self, prompt, **kwargs):
raise NotImplementedError("video generation not supported by this provider")
def fetch_video_task_status(self, task):
raise NotImplementedError("video task status not supported by this provider")
def fetch_video_result(self, task):
raise NotImplementedError("video result not supported by this provider")
# 知识库能力
def query_knowledge(self, query, **kwargs):
raise NotImplementedError("knowledge query not supported by this provider")
# 语音合成能力
def synthesize_speech(self, text, **kwargs):
raise NotImplementedError("speech synthesis not supported by this provider")

34
ai_service/llm/factory.py Normal file
View File

@@ -0,0 +1,34 @@
from .adapter.deepseek import DeepSeekAdapter
from .adapter.genai import GoogleGenAIAdapter
from .adapter.openai import OpenAIAdapter
from .adapter.tongyi import TongYiAdapter
def get_adapter(provider, api_key, model, **kwargs):
if provider == 'deepseek':
return DeepSeekAdapter(api_key, model, **kwargs)
elif provider == 'tongyi':
return TongYiAdapter(api_key, model, **kwargs)
elif provider == 'openai':
return OpenAIAdapter(api_key, model, **kwargs)
elif provider == 'google-genai':
return GoogleGenAIAdapter(api_key, model, **kwargs)
else:
raise ValueError('不支持的服务商')
# 使用示例
# adapter = get_adapter('tongyi', api_key='xxx', model='wanx_v1')
# 对话
# try:
# result = await adapter.chat(messages)
# except NotImplementedError:
# print("该服务商不支持对话能力")
# # 图片生成
# try:
# task = adapter.create_image_task(prompt="一只猫")
# status = adapter.fetch_image_task_status(task)
# result = adapter.fetch_image_result(task)
# except NotImplementedError:
# print("该服务商不支持图片生成")