Vibe的,目前我自己测试没问题,可以修改传入的quality还有分辨率
适合不习惯用那个绘图功能,喜欢单独选模型用的佬友,可填base url,要适配dall-e-3格式的API的
title: BLTCY GPT-Image-2
description: Minimal OpenAI-compatible GPT-Image-2 generation and multi-turn editing for Open WebUI
author: Codex
version: 0.2.0
licence: MIT
“”"
import base64
import hashlib
import io
import json
import logging
import mimetypes
import re
import time
import uuid
from typing import Any, AsyncIterable, Literal, Optional
import httpx
from fastapi import BackgroundTasks, Request, UploadFile
from httpx import Response
from open_webui.env import GLOBAL_LOG_LEVEL
from open_webui.models.users import UserModel, Users
from open_webui.routers.files import get_file_content_by_id, upload_file
from pydantic import BaseModel, Field
from starlette.datastructures import Headers
from starlette.responses import StreamingResponse
logger = logging.getLogger(name)
logger.setLevel(GLOBAL_LOG_LEVEL)
MODEL_ID = “gpt-image-2”
DEFAULT_TIMEOUT = 600
MAX_REFERENCE_IMAGES = 4
class APIException(Exception):
def init(self, status: int, content: str, response: Optional[Response] = None):
self._status = status
self._content = content
self._response = response
def __str__(self) -> str:
try:
data = json.loads(self._content)
if isinstance(data, dict):
if isinstance(data.get("error"), dict):
return data["error"].get("message") or self._content
if data.get("message"):
return str(data["message"])
except Exception:
pass
if self._response is not None:
try:
self._response.raise_for_status()
except Exception as err:
return str(err)
return self._content or "Unknown API error"
class Pipe:
class Valves(BaseModel):
base_url: str = Field(
default=“https://api.openai.com/v1”,
title=“Base URL”,
description=“OpenAI-compatible base URL”,
)
api_key: str = Field(default=“”, title=“API Key”)
default_quality: Literal[“”, “auto”, “low”, “medium”, “high”] = Field(
default=“”,
title=“默认质量”,
description=“留空表示不传;可填 auto/low/medium/high”,
)
default_size: str = Field(
default=“”,
title=“默认分辨率”,
description=“留空表示不传;可填 auto 或 1024x1024/1536x1024 等”,
)
def __init__(self):
self.valves = self.Valves()
def pipes(self):
return [{"id": MODEL_ID, "name": MODEL_ID}]
async def pipe(
self,
body: dict,
__user__: dict,
__request__: Request,
) -> StreamingResponse:
return StreamingResponse(
self._pipe(body=body, __user__=__user__, __request__=__request__),
media_type="text/event-stream",
)
async def _pipe(
self, body: dict, __user__: dict, __request__: Request
) -> AsyncIterable[str]:
if not self.valves.api_key:
raise APIException(status=401, content="请先配置 API Key。")
user = Users.get_user_by_id(__user__["id"])
context = await self._build_context(
user=user,
body=body,
max_reference_images=MAX_REFERENCE_IMAGES,
)
options = self._resolve_request_options()
prompt = context["prompt"].strip()
if not prompt and context["reference_images"]:
prompt = "请基于参考图继续编辑"
if not prompt:
raise APIException(status=400, content="没有解析到可用的提示词。")
endpoint, json_payload, form_data, files = self._build_request(
prompt=prompt,
reference_images=context["reference_images"],
options=options,
)
async with httpx.AsyncClient(
base_url=self.valves.base_url.rstrip("/"),
headers={"Authorization": f"Bearer {self.valves.api_key}"},
trust_env=True,
timeout=DEFAULT_TIMEOUT,
) as client:
if files:
response = await client.post(url=endpoint, data=form_data, files=files)
else:
response = await client.post(url=endpoint, json=json_payload)
if response.status_code != 200:
raise APIException(
status=response.status_code,
content=response.text,
response=response,
)
data = response.json()
results = await self._parse_response(
data=data,
__request__=__request__,
user=user,
)
usage = self._build_usage(data.get("usage"))
content = "\n\n".join(results)
is_stream = bool(body.get("stream"))
if not is_stream:
yield self._format_data(
is_stream=False,
model=MODEL_ID,
content=content,
usage=usage,
finish_reason="stop",
)
return
yield self._format_data(
is_stream=True,
model=MODEL_ID,
content=content,
usage=None,
finish_reason=None if usage else "stop",
)
if usage:
yield self._format_data(
is_stream=True,
model=MODEL_ID,
content=None,
usage=usage,
finish_reason="stop",
)
yield "data: [DONE]\n\n"
async def _build_context(
self,
user: UserModel,
body: dict,
max_reference_images: int,
) -> dict[str, Any]:
messages = [m for m in body.get("messages", []) if m.get("role") != "system"]
recent_messages = messages[-6:]
latest_user_texts: list[str] = []
reference_images: list[dict[str, Any]] = []
seen_digests: set[str] = set()
for message in recent_messages:
texts, images = await self._extract_message_parts(
user=user,
message_content=message.get("content"),
)
if message.get("role") == "user":
latest_user_texts = [text for text in texts if text.strip()]
for image in images:
if image["digest"] in seen_digests:
continue
seen_digests.add(image["digest"])
reference_images.append(image)
if len(reference_images) > max_reference_images:
reference_images = reference_images[-max_reference_images:]
return {
"prompt": "\n".join(latest_user_texts).strip(),
"reference_images": reference_images,
}
async def _extract_message_parts(
self,
user: UserModel,
message_content: Any,
) -> tuple[list[str], list[dict[str, Any]]]:
texts: list[str] = []
images: list[dict[str, Any]] = []
if isinstance(message_content, str):
file_ids = self._extract_generated_file_ids(message_content)
for file_id in file_ids:
image = await self._load_image_from_openwebui_file(
user=user, file_id=file_id
)
if image:
images.append(image)
cleaned_text = re.sub(r"!\[[^\]]*\]\([^)]+\)", " ", message_content)
for line in cleaned_text.splitlines():
line = line.strip()
if line:
texts.append(line)
return texts, images
if isinstance(message_content, list):
for item in message_content:
item_type = item.get("type")
if item_type == "text":
text = (item.get("text") or "").strip()
if text:
texts.append(text)
continue
if item_type == "image_url":
image_url = (item.get("image_url") or {}).get("url", "")
image = await self._load_image_from_url(
user=user, image_url=image_url
)
if image:
images.append(image)
return texts, images
return texts, images
async def _load_image_from_url(
self,
user: UserModel,
image_url: str,
) -> Optional[dict[str, Any]]:
if not image_url:
return None
if image_url.startswith("data:"):
header, encoded = image_url.split(",", 1)
mime_type = header.split(";")[0].split(":")[1]
binary = base64.b64decode(encoded)
return self._build_image_reference(
binary=binary,
mime_type=mime_type,
filename=f"image-{uuid.uuid4().hex}{self._guess_extension(mime_type)}",
)
file_id = self._extract_file_id_from_url(image_url)
if file_id:
return await self._load_image_from_openwebui_file(
user=user, file_id=file_id
)
if image_url.startswith(("http://", "https://")):
async with httpx.AsyncClient(
trust_env=True,
timeout=DEFAULT_TIMEOUT,
) as client:
response = await client.get(image_url)
response.raise_for_status()
mime_type = response.headers.get("content-type", "image/png").split(
";"
)[0]
return self._build_image_reference(
binary=response.content,
mime_type=mime_type,
filename=f"image-{uuid.uuid4().hex}{self._guess_extension(mime_type)}",
)
return None
async def _load_image_from_openwebui_file(
self,
user: UserModel,
file_id: str,
) -> Optional[dict[str, Any]]:
try:
file_response = await get_file_content_by_id(id=file_id, user=user)
except Exception:
return None
mime_type = mimetypes.guess_type(file_response.path)[0] or "image/png"
with open(file_response.path, "rb") as file:
binary = file.read()
return self._build_image_reference(
binary=binary,
mime_type=mime_type,
filename=getattr(file_response, "filename", None)
or f"image-{file_id}{self._guess_extension(mime_type)}",
)
def _build_image_reference(
self,
binary: bytes,
mime_type: str,
filename: str,
) -> dict[str, Any]:
return {
"binary": binary,
"mime_type": mime_type,
"filename": filename,
"digest": hashlib.sha256(binary).hexdigest(),
}
def _resolve_request_options(self) -> dict[str, Any]:
options = {
"quality": self.valves.default_quality,
"size": self.valves.default_size,
}
size = options["size"] or ""
if size and size != "auto":
self._validate_size_string(size)
self._validate_gpt_image_2_size(size)
return options
def _validate_size_string(self, size: str) -> None:
if size == "auto":
return
if re.fullmatch(r"\d+x\d+", size):
return
raise APIException(
status=400,
content="size 格式不合法,应为 auto 或 1024x1024 这种格式。",
)
def _validate_gpt_image_2_size(self, size: str) -> None:
width_str, height_str = size.split("x", 1)
width = int(width_str)
height = int(height_str)
long_edge = max(width, height)
short_edge = min(width, height)
total_pixels = width * height
if long_edge > 3840:
raise APIException(
status=400,
content="gpt-image-2 的 size 最大边不能超过 3840。",
)
if width % 16 != 0 or height % 16 != 0:
raise APIException(
status=400,
content="gpt-image-2 的宽高都必须是 16 的倍数。",
)
if long_edge / short_edge > 3:
raise APIException(
status=400,
content="gpt-image-2 的长宽比不能超过 3:1。",
)
if total_pixels < 655360 or total_pixels > 8294400:
raise APIException(
status=400,
content="gpt-image-2 的总像素必须在 655,360 到 8,294,400 之间。",
)
def _build_request(
self,
prompt: str,
reference_images: list[dict[str, Any]],
options: dict[str, Any],
) -> tuple[
str,
Optional[dict[str, Any]],
Optional[dict[str, Any]],
list[tuple[str, tuple[str, bytes, str]]],
]:
if reference_images:
data = {
"model": MODEL_ID,
"prompt": prompt,
}
self._assign_optional_request_fields(data, options)
files: list[tuple[str, tuple[str, bytes, str]]] = []
field_name = "image[]" if len(reference_images) > 1 else "image"
for image in reference_images:
files.append(
(
field_name,
(image["filename"], image["binary"], image["mime_type"]),
)
)
return "/images/edits", None, data, files
payload = {
"model": MODEL_ID,
"prompt": prompt,
}
self._assign_optional_request_fields(payload, options)
return "/images/generations", payload, None, []
def _assign_optional_request_fields(
self,
payload: dict[str, Any],
options: dict[str, Any],
) -> None:
if options.get("quality"):
payload["quality"] = options["quality"]
if options.get("size"):
payload["size"] = options["size"]
async def _parse_response(
self,
data: dict,
__request__: Request,
user: UserModel,
) -> list[str]:
results: list[str] = []
revised_prompts: list[str] = []
for item in data.get("data", []):
revised_prompt = item.get("revised_prompt")
if revised_prompt and revised_prompt not in revised_prompts:
revised_prompts.append(revised_prompt)
if item.get("b64_json"):
results.append(
self._upload_image(
__request__=__request__,
user=user,
image_data=item["b64_json"],
mime_type=self._get_response_mime_type(item),
)
)
continue
if item.get("url"):
image_data, mime_type = await self._download_image(item["url"])
results.append(
self._upload_image(
__request__=__request__,
user=user,
image_data=image_data,
mime_type=mime_type,
)
)
if revised_prompts:
results.insert(0, "模型重写后的提示词:\n" + "\n".join(revised_prompts))
if not results:
raise APIException(status=500, content="接口没有返回可用图片。")
return results
async def _download_image(self, url: str) -> tuple[str, str]:
async with httpx.AsyncClient(
trust_env=True,
timeout=DEFAULT_TIMEOUT,
) as client:
response = await client.get(url)
response.raise_for_status()
mime_type = response.headers.get("content-type", "image/png").split(";")[0]
encoded = base64.b64encode(response.content).decode("utf-8")
return encoded, mime_type
def _build_usage(self, usage: Optional[dict[str, Any]]) -> Optional[dict[str, Any]]:
if not usage:
return None
prompt_tokens = usage.get("input_tokens", 0)
completion_tokens = usage.get("output_tokens", 0)
total_tokens = usage.get("total_tokens", prompt_tokens + completion_tokens)
metadata = {
key: value
for key, value in usage.items()
if key not in {"input_tokens", "output_tokens", "total_tokens"}
}
return {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
"prompt_tokens_details": usage.get("input_tokens_details", {}),
"metadata": metadata,
}
def _upload_image(
self,
__request__: Request,
user: UserModel,
image_data: str,
mime_type: str,
) -> str:
extension = self._guess_extension(mime_type)
file_item = upload_file(
request=__request__,
background_tasks=BackgroundTasks(),
file=UploadFile(
file=io.BytesIO(base64.b64decode(image_data)),
filename=f"generated-image-{uuid.uuid4().hex}{extension}",
headers=Headers({"content-type": mime_type}),
),
process=False,
user=user,
metadata={"mime_type": mime_type},
)
image_url = __request__.app.url_path_for(
"get_file_content_by_id", id=file_item.id
)
return f""
def _format_data(
self,
is_stream: bool,
model: Optional[str],
content: Optional[str],
usage: Optional[dict[str, Any]],
finish_reason: Optional[str],
) -> str:
data: dict[str, Any] = {
"id": f"chat.{uuid.uuid4().hex}",
"object": "chat.completion.chunk",
"choices": [],
"created": int(time.time()),
"model": model,
}
if content is not None:
data["choices"] = [
{
"finish_reason": finish_reason,
"index": 0,
"delta" if is_stream else "message": {
"content": content,
},
}
]
if usage:
data["usage"] = usage
return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
def _extract_generated_file_ids(self, text: str) -> list[str]:
return re.findall(
r"!\[bltcy-image-([^\]]+)\]",
text,
)
def _extract_file_id_from_url(self, url: str) -> Optional[str]:
match = re.search(r"/files/([^/]+)/content", url)
if match:
return match.group(1)
return None
def _guess_extension(self, mime_type: str) -> str:
return mimetypes.guess_extension(mime_type) or ".png"
def _get_response_mime_type(self, item: dict[str, Any]) -> str:
if item.get("mime_type"):
return item["mime_type"]
output_format = item.get("output_format")
if output_format == "jpeg":
return "image/jpeg"
if output_format == "webp":
return "image/webp"
return "image/png"
1 个帖子 - 1 位参与者