codex建立新会话之后,直接发送类似
codex-session-dialog xxxx-xxx-xxxx --last 10
就会获取指定session倒数10个的会话内容
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import sqlite3
import sys
from datetime import datetime, timezone
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
@dataclass
class DialogMessage:
role: str
text: str
@dataclass
class SessionData:
session_id: str | None
messages: list[DialogMessage]
cwd: str | None = None
@dataclass
class SessionMatch:
file_path: Path
session_id: str | None
cwd: str | None
sort_key: int | None = None
title: str | None = None
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description='Extract a cleaned dialog transcript for a Codex session.'
)
parser.add_argument('target', help='Session id/prefix, a jsonl file, or a workspace directory')
parser.add_argument(
'--codex-home',
default=str(Path.home() / '.codex'),
help='Codex home directory (default: ~/.codex)',
)
parser.add_argument(
'--include-environment',
action='store_true',
help='Keep <environment_context> user messages',
)
parser.add_argument(
'--last',
type=int,
default=None,
help='Only output the most recent N dialog blocks, split by each user message.',
)
parser.add_argument(
'--list',
action='store_true',
help='List matching Codex sessions and their inferred titles for the given workspace path.',
)
parser.add_argument(
'--show-path',
action='store_true',
help='Show rollout jsonl path in list output.',
)
return parser
def normalize_text(text: str) -> str:
lines = [line.rstrip() for line in text.replace('\r\n', '\n').replace('\r', '\n').split('\n')]
compact = '\n'.join(lines).strip()
while '\n\n\n' in compact:
compact = compact.replace('\n\n\n', '\n\n')
return compact
def normalize_path_string(path_str: str) -> str:
return str(Path(path_str).expanduser().resolve(strict=False))
def read_session_meta(path: Path) -> tuple[str | None, str | None]:
try:
with path.open(encoding='utf-8') as fh:
first_line = fh.readline()
payload = json.loads(first_line).get('payload') or {}
except Exception: # noqa: BLE001
return None, None
session_id = payload.get('id')
cwd = payload.get('cwd')
if not isinstance(session_id, str) or not session_id:
session_id = None
if not isinstance(cwd, str) or not cwd:
cwd = None
return session_id, cwd
def iter_rollout_files(codex_home: Path) -> list[Path]:
sessions_root = codex_home / 'sessions'
if not sessions_root.exists():
raise FileNotFoundError(f'sessions directory not found: {sessions_root}')
return sorted(sessions_root.rglob('rollout-*.jsonl'))
def _find_session_file_by_id(session: str, codex_home: Path) -> Path:
exact_matches: list[Path] = []
prefix_matches: list[Path] = []
for path in iter_rollout_files(codex_home):
sid, _ = read_session_meta(path)
if not sid:
continue
if sid == session:
exact_matches.append(path)
elif sid.startswith(session):
prefix_matches.append(path)
if len(exact_matches) == 1:
return exact_matches[0]
if len(exact_matches) > 1:
raise ValueError(f'multiple exact matches found for session {session}')
if len(prefix_matches) == 1:
return prefix_matches[0]
if not prefix_matches:
raise FileNotFoundError(f'no session file found for {session}')
raise ValueError(f'multiple prefix matches found for {session}')
def find_sessions_by_workspace(workspace: Path, codex_home: Path) -> list[SessionMatch]:
workspace_key = normalize_path_string(str(workspace))
matches: list[SessionMatch] = []
for path in iter_rollout_files(codex_home):
session_id, cwd = read_session_meta(path)
if not cwd:
continue
if normalize_path_string(cwd) == workspace_key:
matches.append(SessionMatch(file_path=path, session_id=session_id, cwd=cwd))
if not matches:
raise FileNotFoundError(f'no Codex sessions found for workspace {workspace}')
for match in matches:
match.sort_key = read_thread_sort_key(codex_home, match)
matches.sort(
key=lambda item: (
item.sort_key if item.sort_key is not None else int(item.file_path.stat().st_mtime * 1000),
str(item.file_path),
)
)
return matches
def resolve_target(target: str, codex_home: Path) -> Path:
candidate = Path(target).expanduser()
if candidate.is_file():
if candidate.suffix.lower() != '.jsonl':
raise ValueError(f'not a jsonl file: {candidate}')
return candidate
if candidate.is_dir():
matches = find_sessions_by_workspace(candidate, codex_home)
non_empty: list[SessionMatch] = []
for match in matches:
session_data = load_session_messages(match.file_path)
if not is_effectively_empty_thread(codex_home, match, session_data):
non_empty.append(match)
if not non_empty:
raise FileNotFoundError(f'no non-empty Codex sessions found for workspace {candidate}')
return non_empty[-1].file_path
return _find_session_file_by_id(target, codex_home)
def extract_text_parts(content_items: Iterable[dict]) -> str:
parts: list[str] = []
for item in content_items:
if not isinstance(item, dict):
continue
text = item.get('text')
if isinstance(text, str) and text.strip():
parts.append(text)
continue
output = item.get('output')
if isinstance(output, str) and output.strip():
parts.append(output)
return normalize_text('\n\n'.join(parts))
def should_skip(role: str, text: str, include_environment: bool) -> bool:
if role not in {'user', 'assistant'}:
return True
if not text:
return True
if not include_environment and text.startswith('<environment_context>'):
return True
return False
def load_session_messages(path: Path, include_environment: bool = False) -> SessionData:
session_id: str | None = None
session_cwd: str | None = None
messages: list[DialogMessage] = []
last_key: tuple[str, str] | None = None
with path.open(encoding='utf-8') as fh:
for line in fh:
row = json.loads(line)
row_type = row.get('type')
payload = row.get('payload') or {}
if row_type == 'session_meta' and isinstance(payload, dict):
value = payload.get('id')
cwd = payload.get('cwd')
if isinstance(value, str) and value:
session_id = value
if isinstance(cwd, str) and cwd:
session_cwd = cwd
continue
if row_type != 'response_item' or not isinstance(payload, dict):
continue
if payload.get('type') != 'message':
continue
role = payload.get('role')
text = extract_text_parts(payload.get('content') or [])
if should_skip(role, text, include_environment):
continue
key = (role, normalize_text(text))
if key == last_key:
continue
last_key = key
messages.append(DialogMessage(role=role, text=text))
return SessionData(session_id=session_id, messages=messages, cwd=session_cwd)
def group_dialogs(messages: list[DialogMessage]) -> list[list[DialogMessage]]:
dialogs: list[list[DialogMessage]] = []
current: list[DialogMessage] = []
for message in messages:
if message.role == 'user':
if current:
dialogs.append(current)
current = [message]
continue
if current:
current.append(message)
if current:
dialogs.append(current)
return dialogs
def infer_title(session_data: SessionData, path: Path) -> str:
for message in session_data.messages:
if message.role == 'user' and message.text:
return message.text.splitlines()[0]
return path.stem
def read_thread_row(codex_home: Path, match: SessionMatch, session_data: SessionData | None = None) -> sqlite3.Row | None:
db_path = codex_home / 'state_5.sqlite'
if not db_path.exists():
return None
try:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
row = conn.execute(
"select * from threads where rollout_path = ? limit 1",
(str(match.file_path),),
).fetchone()
if row is None:
sid = None
if session_data and session_data.session_id:
sid = session_data.session_id
elif match.session_id:
sid = match.session_id
if sid:
row = conn.execute(
"select * from threads where id = ? limit 1",
(sid,),
).fetchone()
conn.close()
return row
except Exception: # noqa: BLE001
return None
def read_thread_sort_key(codex_home: Path, match: SessionMatch) -> int | None:
row = read_thread_row(codex_home, match)
if row is None:
return None
keys = row.keys()
if 'updated_at_ms' in keys:
value = row['updated_at_ms']
if isinstance(value, int):
return value
if 'updated_at' in keys:
value = row['updated_at']
if isinstance(value, int):
return value * 1000
return None
def read_resume_title(codex_home: Path, match: SessionMatch, session_data: SessionData) -> str | None:
row = read_thread_row(codex_home, match, session_data)
if row is None:
return None
title = row['title']
return title if isinstance(title, str) and title else None
def format_sort_key(sort_key: int | None, fallback_path: Path) -> str:
if sort_key is not None:
return datetime.fromtimestamp(sort_key / 1000, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%SZ')
return datetime.fromtimestamp(fallback_path.stat().st_mtime, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%SZ')
def truncate_title(title: str, max_len: int = 60) -> str:
if len(title) <= max_len:
return title
return title[: max_len - 3] + '...'
def is_effectively_empty_thread(codex_home: Path, match: SessionMatch, session_data: SessionData) -> bool:
title = read_resume_title(codex_home, match, session_data)
if title:
match.title = title
return False
return len(session_data.messages) == 0
def render_list_output(target_path: Path, codex_home: Path, include_environment: bool, show_path: bool = False) -> str:
lines = [f'# workspace: {target_path}', '']
if target_path.is_file():
matches = [SessionMatch(file_path=target_path, session_id=None, cwd=None)]
elif target_path.is_dir():
matches = find_sessions_by_workspace(target_path, codex_home)
else:
raise FileNotFoundError(f'path not found: {target_path}')
for match in matches:
session_data = load_session_messages(match.file_path, include_environment=include_environment)
if is_effectively_empty_thread(codex_home, match, session_data):
continue
raw_title = match.title or infer_title(session_data, match.file_path)
title = truncate_title(raw_title)
session_id = session_data.session_id or match.session_id or '-'
updated = format_sort_key(match.sort_key, match.file_path)
base = f'{updated} | {session_id} | {title}'
lines.append(f'{base} | {match.file_path}' if show_path else base)
return '\n'.join(lines).rstrip() + '\n'
def render_output(
session_id: str | None,
path: Path,
messages: list[DialogMessage],
last: int | None = None,
) -> str:
actual_session = session_id or path.stem
lines = [f'# session: {actual_session}', f'# file: {path}', '']
if last is None:
for message in messages:
lines.append(f'{message.role}: {message.text}')
return '\n'.join(lines).rstrip() + '\n'
dialogs = group_dialogs(messages)
if last < 0:
raise ValueError('--last must be >= 0')
selected = dialogs[-last:] if last else []
for index, dialog in enumerate(selected, 1):
lines.append(f'## dialog {index}')
for message in dialog:
lines.append(f'{message.role}: {message.text}')
lines.append('')
return '\n'.join(lines).rstrip() + '\n'
def main() -> int:
parser = build_parser()
args = parser.parse_args()
try:
codex_home = Path(args.codex_home).expanduser()
target_path = Path(args.target).expanduser()
if args.list:
output = render_list_output(
target_path,
codex_home=codex_home,
include_environment=args.include_environment,
show_path=args.show_path,
)
else:
path = resolve_target(args.target, codex_home)
session_data = load_session_messages(
path, include_environment=args.include_environment
)
output = render_output(
session_data.session_id,
path,
session_data.messages,
last=args.last,
)
except Exception as exc: # noqa: BLE001
print(f'error: {exc}', file=sys.stderr)
return 1
sys.stdout.write(output)
return 0
if __name__ == '__main__':
raise SystemExit(main())
1 个帖子 - 1 位参与者