![]()
由于我自己本地CPA只有free了,50个,自己蹬基本满足了。
CPA的路由策略只有rr和ff。
然后 奥特曼告诉我不要浪费周限额,于是我就让codex写了个脚本,基本原理就是给周限额快重置且还有额度的账号设置一个高优先级优先使用。
- 默认优先用备注,备注缺失才请求 API。
--refresh:强制每个账号都请求 API,并更新备注。--refresh-expired-note:备注里的reset_at过期时刷新。--dry-run:只打印计划,不写 priority。--min-priority / --max-priority:控制优先级范围。used >= 100自动降到priority=0。
#!/usr/bin/env python3
import argparse
import json
import logging
import os
import re
import sys
from datetime import datetime, timezone
import requests
DEFAULT_UA = "codex_cli_rs/0.76.0 (Debian 13.0.0; x86_64) WindowsTerminal"
def build_parser():
p = argparse.ArgumentParser(
description="Sync CLIProxyAPI Codex auth priorities by weekly quota reset time."
)
p.add_argument("--base-url", default=os.getenv("CPA_BASE_URL", "http://127.0.0.1:8317"))
p.add_argument("--key", default=os.getenv("CPA_MANAGEMENT_KEY"), help="Management API key")
p.add_argument("--refresh", action="store_true", help="Always fetch quota from wham/usage and update notes")
p.add_argument("--refresh-expired-note", action="store_true", help="Fetch quota when cached note reset_at is expired")
p.add_argument("--dry-run", action="store_true", help="Print actions without PATCHing priorities")
p.add_argument("--max-priority", type=int, default=1000)
p.add_argument("--min-priority", type=int, default=1)
p.add_argument("--exhausted-priority", type=int, default=0)
p.add_argument("--user-agent", default=os.getenv("CODEX_USAGE_UA", DEFAULT_UA))
p.add_argument("--log-level", default=os.getenv("LOG_LEVEL", "INFO"))
return p
class Client:
def __init__(self, base_url, key):
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {key}",
"Content-Type": "application/json",
"Accept": "application/json",
})
def api(self, method, path, **kwargs):
resp = self.session.request(method, self.base_url + path, timeout=30, **kwargs)
if resp.status_code >= 400:
logging.warning("request failed %s %s status=%s body=%s", method, path, resp.status_code, resp.text[:500])
resp.raise_for_status()
return resp.json()
def parse_time_to_epoch(value):
if value is None:
return None
if isinstance(value, (int, float)):
return int(value)
s = str(value).strip()
if not s:
return None
if s.isdigit():
return int(s)
if s.endswith("Z"):
s = s[:-1] + "+00:00"
try:
return int(datetime.fromisoformat(s).astimezone(timezone.utc).timestamp())
except ValueError:
return None
def fmt_ts(ts):
return datetime.fromtimestamp(int(ts), tz=timezone.utc).astimezone().isoformat(timespec="seconds")
def parse_note(note):
if not note:
return None, None
reset_match = re.search(r"reset_at=([^;]+)", note)
used_match = re.search(r"used=([0-9]+(?:\.[0-9]+)?)%?", note)
reset_at = parse_time_to_epoch(reset_match.group(1).strip()) if reset_match else None
used = float(used_match.group(1)) if used_match else None
return reset_at, used
def get_codex_auths(client):
data = client.api("GET", "/v0/management/auth-files")
auths = []
for item in data.get("files", []):
if item.get("type") != "codex":
continue
if item.get("disabled"):
logging.info("skip disabled auth: %s", item.get("name"))
continue
if not item.get("name") or not item.get("auth_index"):
logging.warning("skip auth missing name/auth_index: %s", item)
continue
auths.append(item)
logging.info("active codex auths=%d", len(auths))
return auths
def fetch_usage(client, auth, user_agent):
name = auth["name"]
logging.info("fetch usage from upstream: %s", name)
headers = {
"Authorization": "Bearer $TOKEN$",
"Content-Type": "application/json",
"User-Agent": user_agent,
}
account_id = auth.get("account") or auth.get("account_id")
if account_id:
headers["Chatgpt-Account-Id"] = account_id
data = client.api("POST", "/v0/management/api-call", json={
"authIndex": auth["auth_index"],
"method": "GET",
"url": "https://chatgpt.com/backend-api/wham/usage",
"header": headers,
})
if data.get("status_code") != 200:
raise RuntimeError(f"upstream status={data.get('status_code')}, body={(data.get('body') or '')[:300]}")
body = json.loads(data.get("body") or "{}")
rate_limit = body.get("rate_limit") or {}
primary = rate_limit.get("primary_window") or {}
reset_at = primary.get("reset_at")
used = primary.get("used_percent")
if reset_at is None or used is None:
raise RuntimeError("missing reset_at or used_percent in upstream usage body")
return int(reset_at), float(used)
def resolve_usage(client, auth, args):
name = auth["name"]
if args.refresh:
reset_at, used = fetch_usage(client, auth, args.user_agent)
return reset_at, used, "api"
reset_at, used = parse_note(auth.get("note") or "")
now = int(datetime.now(timezone.utc).timestamp())
if reset_at is not None and used is not None:
if args.refresh_expired_note and reset_at <= now:
logging.info("cached note expired, refreshing: name=%s reset_at=%s", name, fmt_ts(reset_at))
else:
logging.info("use cached note: name=%s reset_at=%s used=%s", name, fmt_ts(reset_at), used)
return reset_at, used, "note"
reset_at, used = fetch_usage(client, auth, args.user_agent)
return reset_at, used, "api"
def set_priority(client, name, priority, reset_at=None, used=None, reason=None, dry_run=False):
payload = {"name": name, "priority": priority}
if reset_at is not None and used is not None:
payload["note"] = f"auto quota priority; reset_at={fmt_ts(reset_at)}; used={used}"
elif reason:
payload["note"] = f"auto quota priority failed: {reason}"
if dry_run:
logging.info("[dry-run] PATCH auth-fields payload=%s", payload)
return
client.api("PATCH", "/v0/management/auth-files/fields", json=payload)
def compute_priority(rank, args):
priority = args.max_priority - rank
if priority < args.min_priority:
priority = args.min_priority
return priority
def run(args):
client = Client(args.base_url, args.key)
usable = []
exhausted = []
failed = []
for auth in get_codex_auths(client):
name = auth["name"]
try:
reset_at, used, source = resolve_usage(client, auth, args)
row = {
"name": name,
"reset_at": reset_at,
"used": used,
"source": source,
}
if used >= 100:
exhausted.append(row)
logging.info("quota exhausted: name=%s used=%s reset_at=%s source=%s", name, used, fmt_ts(reset_at), source)
else:
usable.append(row)
except Exception as exc:
logging.exception("usage unavailable: name=%s", name)
failed.append({"name": name, "error": str(exc)})
usable.sort(key=lambda x: x["reset_at"])
logging.info("usable=%d exhausted=%d failed=%d", len(usable), len(exhausted), len(failed))
for rank, row in enumerate(usable):
priority = compute_priority(rank, args)
set_priority(client, row["name"], priority, row["reset_at"], row["used"], dry_run=args.dry_run)
logging.info(
"ranked rank=%d name=%s priority=%d reset_at=%s used=%s source=%s",
rank + 1,
row["name"],
priority,
fmt_ts(row["reset_at"]),
row["used"],
row["source"],
)
for row in exhausted:
set_priority(client, row["name"], args.exhausted_priority, row["reset_at"], row["used"], dry_run=args.dry_run)
logging.info("downgrade exhausted name=%s priority=%d used=%s", row["name"], args.exhausted_priority, row["used"])
for row in failed:
set_priority(client, row["name"], args.exhausted_priority, reason=row["error"], dry_run=args.dry_run)
logging.warning("downgrade failed name=%s priority=%d error=%s", row["name"], args.exhausted_priority, row["error"])
def main():
args = build_parser().parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level.upper(), logging.INFO),
format="%(asctime)s %(levelname)s %(message)s",
)
if not args.key:
logging.error("missing management key, pass --key or set CPA_MANAGEMENT_KEY")
sys.exit(1)
logging.info("start codex quota priority sync base_url=%s refresh=%s dry_run=%s", args.base_url, args.refresh, args.dry_run)
run(args)
logging.info("finished")
if __name__ == "__main__":
main()
用法:
python3 codex_quota_priority.py --key '你的管理密钥'
强制重新请求 API、更新备注并重排:
python3 codex_quota_priority.py --key '你的管理密钥' --refresh
先看会做什么,不实际更新:
python3 codex_quota_priority.py --key '你的管理密钥' --refresh --dry-run --log-level DEBUG
配合 CPA 推荐:
routing:
strategy: "fill-first"
2 个帖子 - 2 位参与者