CPA优先使用周限额快到期且还有额度的账号

由于我自己本地CPA只有free了,50个,自己蹬基本满足了。 CPA的路由策略只有rr和ff。 然后 奥特曼告诉我不要浪费周限额,于是我就让codex写了个脚本,基本原理就是给周限额快重置且还有额度的账号设置一个高优先级优先使用。 默认优先用备注,备注缺失才请求 API。 --refresh :强...
CPA优先使用周限额快到期且还有额度的账号
CPA优先使用周限额快到期且还有额度的账号

:distorted_face:
由于我自己本地CPA只有free了,50个,自己蹬基本满足了。

CPA的路由策略只有rr和ff。

然后 奥特曼告诉我不要浪费周限额,于是我就让codex写了个脚本,基本原理就是给周限额快重置且还有额度的账号设置一个高优先级优先使用。

  • 默认优先用备注,备注缺失才请求 API。
  • --refresh:强制每个账号都请求 API,并更新备注。
  • --refresh-expired-note:备注里的 reset_at 过期时刷新。
  • --dry-run:只打印计划,不写 priority。
  • --min-priority / --max-priority:控制优先级范围。
  • used >= 100 自动降到 priority=0
#!/usr/bin/env python3
import argparse
import json
import logging
import os
import re
import sys
from datetime import datetime, timezone

import requests


DEFAULT_UA = "codex_cli_rs/0.76.0 (Debian 13.0.0; x86_64) WindowsTerminal"


def build_parser():
    p = argparse.ArgumentParser(
        description="Sync CLIProxyAPI Codex auth priorities by weekly quota reset time."
    )
    p.add_argument("--base-url", default=os.getenv("CPA_BASE_URL", "http://127.0.0.1:8317"))
    p.add_argument("--key", default=os.getenv("CPA_MANAGEMENT_KEY"), help="Management API key")
    p.add_argument("--refresh", action="store_true", help="Always fetch quota from wham/usage and update notes")
    p.add_argument("--refresh-expired-note", action="store_true", help="Fetch quota when cached note reset_at is expired")
    p.add_argument("--dry-run", action="store_true", help="Print actions without PATCHing priorities")
    p.add_argument("--max-priority", type=int, default=1000)
    p.add_argument("--min-priority", type=int, default=1)
    p.add_argument("--exhausted-priority", type=int, default=0)
    p.add_argument("--user-agent", default=os.getenv("CODEX_USAGE_UA", DEFAULT_UA))
    p.add_argument("--log-level", default=os.getenv("LOG_LEVEL", "INFO"))
    return p


class Client:
    def __init__(self, base_url, key):
        self.base_url = base_url.rstrip("/")
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {key}",
            "Content-Type": "application/json",
            "Accept": "application/json",
        })

    def api(self, method, path, **kwargs):
        resp = self.session.request(method, self.base_url + path, timeout=30, **kwargs)
        if resp.status_code >= 400:
            logging.warning("request failed %s %s status=%s body=%s", method, path, resp.status_code, resp.text[:500])
        resp.raise_for_status()
        return resp.json()


def parse_time_to_epoch(value):
    if value is None:
        return None
    if isinstance(value, (int, float)):
        return int(value)

    s = str(value).strip()
    if not s:
        return None
    if s.isdigit():
        return int(s)
    if s.endswith("Z"):
        s = s[:-1] + "+00:00"

    try:
        return int(datetime.fromisoformat(s).astimezone(timezone.utc).timestamp())
    except ValueError:
        return None


def fmt_ts(ts):
    return datetime.fromtimestamp(int(ts), tz=timezone.utc).astimezone().isoformat(timespec="seconds")


def parse_note(note):
    if not note:
        return None, None

    reset_match = re.search(r"reset_at=([^;]+)", note)
    used_match = re.search(r"used=([0-9]+(?:\.[0-9]+)?)%?", note)

    reset_at = parse_time_to_epoch(reset_match.group(1).strip()) if reset_match else None
    used = float(used_match.group(1)) if used_match else None
    return reset_at, used


def get_codex_auths(client):
    data = client.api("GET", "/v0/management/auth-files")
    auths = []

    for item in data.get("files", []):
        if item.get("type") != "codex":
            continue
        if item.get("disabled"):
            logging.info("skip disabled auth: %s", item.get("name"))
            continue
        if not item.get("name") or not item.get("auth_index"):
            logging.warning("skip auth missing name/auth_index: %s", item)
            continue
        auths.append(item)

    logging.info("active codex auths=%d", len(auths))
    return auths


def fetch_usage(client, auth, user_agent):
    name = auth["name"]
    logging.info("fetch usage from upstream: %s", name)

    headers = {
        "Authorization": "Bearer $TOKEN$",
        "Content-Type": "application/json",
        "User-Agent": user_agent,
    }

    account_id = auth.get("account") or auth.get("account_id")
    if account_id:
        headers["Chatgpt-Account-Id"] = account_id

    data = client.api("POST", "/v0/management/api-call", json={
        "authIndex": auth["auth_index"],
        "method": "GET",
        "url": "https://chatgpt.com/backend-api/wham/usage",
        "header": headers,
    })

    if data.get("status_code") != 200:
        raise RuntimeError(f"upstream status={data.get('status_code')}, body={(data.get('body') or '')[:300]}")

    body = json.loads(data.get("body") or "{}")
    rate_limit = body.get("rate_limit") or {}
    primary = rate_limit.get("primary_window") or {}

    reset_at = primary.get("reset_at")
    used = primary.get("used_percent")

    if reset_at is None or used is None:
        raise RuntimeError("missing reset_at or used_percent in upstream usage body")

    return int(reset_at), float(used)


def resolve_usage(client, auth, args):
    name = auth["name"]

    if args.refresh:
        reset_at, used = fetch_usage(client, auth, args.user_agent)
        return reset_at, used, "api"

    reset_at, used = parse_note(auth.get("note") or "")
    now = int(datetime.now(timezone.utc).timestamp())

    if reset_at is not None and used is not None:
        if args.refresh_expired_note and reset_at <= now:
            logging.info("cached note expired, refreshing: name=%s reset_at=%s", name, fmt_ts(reset_at))
        else:
            logging.info("use cached note: name=%s reset_at=%s used=%s", name, fmt_ts(reset_at), used)
            return reset_at, used, "note"

    reset_at, used = fetch_usage(client, auth, args.user_agent)
    return reset_at, used, "api"


def set_priority(client, name, priority, reset_at=None, used=None, reason=None, dry_run=False):
    payload = {"name": name, "priority": priority}

    if reset_at is not None and used is not None:
        payload["note"] = f"auto quota priority; reset_at={fmt_ts(reset_at)}; used={used}"
    elif reason:
        payload["note"] = f"auto quota priority failed: {reason}"

    if dry_run:
        logging.info("[dry-run] PATCH auth-fields payload=%s", payload)
        return

    client.api("PATCH", "/v0/management/auth-files/fields", json=payload)


def compute_priority(rank, args):
    priority = args.max_priority - rank
    if priority < args.min_priority:
        priority = args.min_priority
    return priority


def run(args):
    client = Client(args.base_url, args.key)

    usable = []
    exhausted = []
    failed = []

    for auth in get_codex_auths(client):
        name = auth["name"]
        try:
            reset_at, used, source = resolve_usage(client, auth, args)

            row = {
                "name": name,
                "reset_at": reset_at,
                "used": used,
                "source": source,
            }

            if used >= 100:
                exhausted.append(row)
                logging.info("quota exhausted: name=%s used=%s reset_at=%s source=%s", name, used, fmt_ts(reset_at), source)
            else:
                usable.append(row)
        except Exception as exc:
            logging.exception("usage unavailable: name=%s", name)
            failed.append({"name": name, "error": str(exc)})

    usable.sort(key=lambda x: x["reset_at"])

    logging.info("usable=%d exhausted=%d failed=%d", len(usable), len(exhausted), len(failed))

    for rank, row in enumerate(usable):
        priority = compute_priority(rank, args)
        set_priority(client, row["name"], priority, row["reset_at"], row["used"], dry_run=args.dry_run)
        logging.info(
            "ranked rank=%d name=%s priority=%d reset_at=%s used=%s source=%s",
            rank + 1,
            row["name"],
            priority,
            fmt_ts(row["reset_at"]),
            row["used"],
            row["source"],
        )

    for row in exhausted:
        set_priority(client, row["name"], args.exhausted_priority, row["reset_at"], row["used"], dry_run=args.dry_run)
        logging.info("downgrade exhausted name=%s priority=%d used=%s", row["name"], args.exhausted_priority, row["used"])

    for row in failed:
        set_priority(client, row["name"], args.exhausted_priority, reason=row["error"], dry_run=args.dry_run)
        logging.warning("downgrade failed name=%s priority=%d error=%s", row["name"], args.exhausted_priority, row["error"])


def main():
    args = build_parser().parse_args()

    logging.basicConfig(
        level=getattr(logging, args.log_level.upper(), logging.INFO),
        format="%(asctime)s %(levelname)s %(message)s",
    )

    if not args.key:
        logging.error("missing management key, pass --key or set CPA_MANAGEMENT_KEY")
        sys.exit(1)

    logging.info("start codex quota priority sync base_url=%s refresh=%s dry_run=%s", args.base_url, args.refresh, args.dry_run)
    run(args)
    logging.info("finished")


if __name__ == "__main__":
    main()

用法:

python3 codex_quota_priority.py --key '你的管理密钥'

强制重新请求 API、更新备注并重排:

python3 codex_quota_priority.py --key '你的管理密钥' --refresh

先看会做什么,不实际更新:

python3 codex_quota_priority.py --key '你的管理密钥' --refresh --dry-run --log-level DEBUG

配合 CPA 推荐:

routing:
  strategy: "fill-first"

2 个帖子 - 2 位参与者

阅读完整话题

来源: LinuxDo 最新话题查看原文