codex手动抓回上下文,绕过compact失败

codex建立新会话之后,直接发送类似 codex-session-dialog xxxx-xxx-xxxx --last 10 就会获取指定session倒数10个的会话内容 #!/usr/bin/env python3 from __future__ import annotations imp...
codex手动抓回上下文,绕过compact失败
codex手动抓回上下文,绕过compact失败

codex建立新会话之后,直接发送类似

codex-session-dialog xxxx-xxx-xxxx --last 10

就会获取指定session倒数10个的会话内容

#!/usr/bin/env python3
from __future__ import annotations

import argparse
import json
import sqlite3
import sys
from datetime import datetime, timezone
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable


@dataclass
class DialogMessage:
    role: str
    text: str


@dataclass
class SessionData:
    session_id: str | None
    messages: list[DialogMessage]
    cwd: str | None = None


@dataclass
class SessionMatch:
    file_path: Path
    session_id: str | None
    cwd: str | None
    sort_key: int | None = None
    title: str | None = None


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description='Extract a cleaned dialog transcript for a Codex session.'
    )
    parser.add_argument('target', help='Session id/prefix, a jsonl file, or a workspace directory')
    parser.add_argument(
        '--codex-home',
        default=str(Path.home() / '.codex'),
        help='Codex home directory (default: ~/.codex)',
    )
    parser.add_argument(
        '--include-environment',
        action='store_true',
        help='Keep <environment_context> user messages',
    )
    parser.add_argument(
        '--last',
        type=int,
        default=None,
        help='Only output the most recent N dialog blocks, split by each user message.',
    )
    parser.add_argument(
        '--list',
        action='store_true',
        help='List matching Codex sessions and their inferred titles for the given workspace path.',
    )
    parser.add_argument(
        '--show-path',
        action='store_true',
        help='Show rollout jsonl path in list output.',
    )
    return parser


def normalize_text(text: str) -> str:
    lines = [line.rstrip() for line in text.replace('\r\n', '\n').replace('\r', '\n').split('\n')]
    compact = '\n'.join(lines).strip()
    while '\n\n\n' in compact:
        compact = compact.replace('\n\n\n', '\n\n')
    return compact


def normalize_path_string(path_str: str) -> str:
    return str(Path(path_str).expanduser().resolve(strict=False))


def read_session_meta(path: Path) -> tuple[str | None, str | None]:
    try:
        with path.open(encoding='utf-8') as fh:
            first_line = fh.readline()
        payload = json.loads(first_line).get('payload') or {}
    except Exception:  # noqa: BLE001
        return None, None
    session_id = payload.get('id')
    cwd = payload.get('cwd')
    if not isinstance(session_id, str) or not session_id:
        session_id = None
    if not isinstance(cwd, str) or not cwd:
        cwd = None
    return session_id, cwd


def iter_rollout_files(codex_home: Path) -> list[Path]:
    sessions_root = codex_home / 'sessions'
    if not sessions_root.exists():
        raise FileNotFoundError(f'sessions directory not found: {sessions_root}')
    return sorted(sessions_root.rglob('rollout-*.jsonl'))


def _find_session_file_by_id(session: str, codex_home: Path) -> Path:
    exact_matches: list[Path] = []
    prefix_matches: list[Path] = []
    for path in iter_rollout_files(codex_home):
        sid, _ = read_session_meta(path)
        if not sid:
            continue
        if sid == session:
            exact_matches.append(path)
        elif sid.startswith(session):
            prefix_matches.append(path)

    if len(exact_matches) == 1:
        return exact_matches[0]
    if len(exact_matches) > 1:
        raise ValueError(f'multiple exact matches found for session {session}')
    if len(prefix_matches) == 1:
        return prefix_matches[0]
    if not prefix_matches:
        raise FileNotFoundError(f'no session file found for {session}')
    raise ValueError(f'multiple prefix matches found for {session}')


def find_sessions_by_workspace(workspace: Path, codex_home: Path) -> list[SessionMatch]:
    workspace_key = normalize_path_string(str(workspace))
    matches: list[SessionMatch] = []
    for path in iter_rollout_files(codex_home):
        session_id, cwd = read_session_meta(path)
        if not cwd:
            continue
        if normalize_path_string(cwd) == workspace_key:
            matches.append(SessionMatch(file_path=path, session_id=session_id, cwd=cwd))
    if not matches:
        raise FileNotFoundError(f'no Codex sessions found for workspace {workspace}')
    for match in matches:
        match.sort_key = read_thread_sort_key(codex_home, match)
    matches.sort(
        key=lambda item: (
            item.sort_key if item.sort_key is not None else int(item.file_path.stat().st_mtime * 1000),
            str(item.file_path),
        )
    )
    return matches


def resolve_target(target: str, codex_home: Path) -> Path:
    candidate = Path(target).expanduser()
    if candidate.is_file():
        if candidate.suffix.lower() != '.jsonl':
            raise ValueError(f'not a jsonl file: {candidate}')
        return candidate
    if candidate.is_dir():
        matches = find_sessions_by_workspace(candidate, codex_home)
        non_empty: list[SessionMatch] = []
        for match in matches:
            session_data = load_session_messages(match.file_path)
            if not is_effectively_empty_thread(codex_home, match, session_data):
                non_empty.append(match)
        if not non_empty:
            raise FileNotFoundError(f'no non-empty Codex sessions found for workspace {candidate}')
        return non_empty[-1].file_path
    return _find_session_file_by_id(target, codex_home)


def extract_text_parts(content_items: Iterable[dict]) -> str:
    parts: list[str] = []
    for item in content_items:
        if not isinstance(item, dict):
            continue
        text = item.get('text')
        if isinstance(text, str) and text.strip():
            parts.append(text)
            continue
        output = item.get('output')
        if isinstance(output, str) and output.strip():
            parts.append(output)
    return normalize_text('\n\n'.join(parts))


def should_skip(role: str, text: str, include_environment: bool) -> bool:
    if role not in {'user', 'assistant'}:
        return True
    if not text:
        return True
    if not include_environment and text.startswith('<environment_context>'):
        return True
    return False


def load_session_messages(path: Path, include_environment: bool = False) -> SessionData:
    session_id: str | None = None
    session_cwd: str | None = None
    messages: list[DialogMessage] = []
    last_key: tuple[str, str] | None = None

    with path.open(encoding='utf-8') as fh:
        for line in fh:
            row = json.loads(line)
            row_type = row.get('type')
            payload = row.get('payload') or {}

            if row_type == 'session_meta' and isinstance(payload, dict):
                value = payload.get('id')
                cwd = payload.get('cwd')
                if isinstance(value, str) and value:
                    session_id = value
                if isinstance(cwd, str) and cwd:
                    session_cwd = cwd
                continue

            if row_type != 'response_item' or not isinstance(payload, dict):
                continue
            if payload.get('type') != 'message':
                continue

            role = payload.get('role')
            text = extract_text_parts(payload.get('content') or [])
            if should_skip(role, text, include_environment):
                continue

            key = (role, normalize_text(text))
            if key == last_key:
                continue
            last_key = key
            messages.append(DialogMessage(role=role, text=text))

    return SessionData(session_id=session_id, messages=messages, cwd=session_cwd)


def group_dialogs(messages: list[DialogMessage]) -> list[list[DialogMessage]]:
    dialogs: list[list[DialogMessage]] = []
    current: list[DialogMessage] = []

    for message in messages:
        if message.role == 'user':
            if current:
                dialogs.append(current)
            current = [message]
            continue
        if current:
            current.append(message)

    if current:
        dialogs.append(current)
    return dialogs


def infer_title(session_data: SessionData, path: Path) -> str:
    for message in session_data.messages:
        if message.role == 'user' and message.text:
            return message.text.splitlines()[0]
    return path.stem


def read_thread_row(codex_home: Path, match: SessionMatch, session_data: SessionData | None = None) -> sqlite3.Row | None:
    db_path = codex_home / 'state_5.sqlite'
    if not db_path.exists():
        return None
    try:
        conn = sqlite3.connect(db_path)
        conn.row_factory = sqlite3.Row
        row = conn.execute(
            "select * from threads where rollout_path = ? limit 1",
            (str(match.file_path),),
        ).fetchone()
        if row is None:
            sid = None
            if session_data and session_data.session_id:
                sid = session_data.session_id
            elif match.session_id:
                sid = match.session_id
            if sid:
                row = conn.execute(
                    "select * from threads where id = ? limit 1",
                    (sid,),
                ).fetchone()
        conn.close()
        return row
    except Exception:  # noqa: BLE001
        return None


def read_thread_sort_key(codex_home: Path, match: SessionMatch) -> int | None:
    row = read_thread_row(codex_home, match)
    if row is None:
        return None
    keys = row.keys()
    if 'updated_at_ms' in keys:
        value = row['updated_at_ms']
        if isinstance(value, int):
            return value
    if 'updated_at' in keys:
        value = row['updated_at']
        if isinstance(value, int):
            return value * 1000
    return None


def read_resume_title(codex_home: Path, match: SessionMatch, session_data: SessionData) -> str | None:
    row = read_thread_row(codex_home, match, session_data)
    if row is None:
        return None
    title = row['title']
    return title if isinstance(title, str) and title else None


def format_sort_key(sort_key: int | None, fallback_path: Path) -> str:
    if sort_key is not None:
        return datetime.fromtimestamp(sort_key / 1000, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%SZ')
    return datetime.fromtimestamp(fallback_path.stat().st_mtime, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%SZ')


def truncate_title(title: str, max_len: int = 60) -> str:
    if len(title) <= max_len:
        return title
    return title[: max_len - 3] + '...'


def is_effectively_empty_thread(codex_home: Path, match: SessionMatch, session_data: SessionData) -> bool:
    title = read_resume_title(codex_home, match, session_data)
    if title:
        match.title = title
        return False
    return len(session_data.messages) == 0


def render_list_output(target_path: Path, codex_home: Path, include_environment: bool, show_path: bool = False) -> str:
    lines = [f'# workspace: {target_path}', '']
    if target_path.is_file():
        matches = [SessionMatch(file_path=target_path, session_id=None, cwd=None)]
    elif target_path.is_dir():
        matches = find_sessions_by_workspace(target_path, codex_home)
    else:
        raise FileNotFoundError(f'path not found: {target_path}')

    for match in matches:
        session_data = load_session_messages(match.file_path, include_environment=include_environment)
        if is_effectively_empty_thread(codex_home, match, session_data):
            continue
        raw_title = match.title or infer_title(session_data, match.file_path)
        title = truncate_title(raw_title)
        session_id = session_data.session_id or match.session_id or '-'
        updated = format_sort_key(match.sort_key, match.file_path)
        base = f'{updated} | {session_id} | {title}'
        lines.append(f'{base} | {match.file_path}' if show_path else base)
    return '\n'.join(lines).rstrip() + '\n'


def render_output(
    session_id: str | None,
    path: Path,
    messages: list[DialogMessage],
    last: int | None = None,
) -> str:
    actual_session = session_id or path.stem
    lines = [f'# session: {actual_session}', f'# file: {path}', '']

    if last is None:
        for message in messages:
            lines.append(f'{message.role}: {message.text}')
        return '\n'.join(lines).rstrip() + '\n'

    dialogs = group_dialogs(messages)
    if last < 0:
        raise ValueError('--last must be >= 0')
    selected = dialogs[-last:] if last else []
    for index, dialog in enumerate(selected, 1):
        lines.append(f'## dialog {index}')
        for message in dialog:
            lines.append(f'{message.role}: {message.text}')
        lines.append('')
    return '\n'.join(lines).rstrip() + '\n'


def main() -> int:
    parser = build_parser()
    args = parser.parse_args()

    try:
        codex_home = Path(args.codex_home).expanduser()
        target_path = Path(args.target).expanduser()
        if args.list:
            output = render_list_output(
                target_path,
                codex_home=codex_home,
                include_environment=args.include_environment,
                show_path=args.show_path,
            )
        else:
            path = resolve_target(args.target, codex_home)
            session_data = load_session_messages(
                path, include_environment=args.include_environment
            )
            output = render_output(
                session_data.session_id,
                path,
                session_data.messages,
                last=args.last,
            )
    except Exception as exc:  # noqa: BLE001
        print(f'error: {exc}', file=sys.stderr)
        return 1

    sys.stdout.write(output)
    return 0


if __name__ == '__main__':
    raise SystemExit(main())

1 个帖子 - 1 位参与者

阅读完整话题

来源: LinuxDo 最新话题查看原文