好久没有发帖子了,把最近解决 Claude Code 任务列表更新不及时的问题做个记录。
问题是发现CC任务完成了,任务列表显示还有未完成项,问一下它会说已经完成了,然后将任务列表更新,主要应该包含以下几种状态:
- 已完成的任务仍然显示为
pending或in_progress - 会话压缩之后,Claude 继续依赖旧的任务列表判断
- 结束会话前,任务列表状态没有被再次确认
- 多次
TaskCreate/TaskUpdate后,没有可靠机制提醒 Claude 做最终清理
但是一直这样也不是个事,看了下 Github 的 Issues,也有几个跟这个相关,处于还没有解决的状态。想了下,这个应该可以通过 Hook 解决。
只要本轮会话中发生过任务状态变更,就在关键时刻提醒 Claude 重新检查任务列表,并在停止前确保任务状态被清理。
添加一个本地 Python hook:task_cleanup_hook.py。它会维护一个很小的状态文件,用来记录当前会话是否发生过任务列表相关变化。主要监听几个 Claude Code 生命周期事件:
PostToolUse
当调用 TaskCreate、TaskUpdate、TaskList 后记录任务状态变化
PreCompact
在上下文压缩前记录任务列表可能需要恢复或核对
PostCompact
在压缩后提醒 Claude 重新读取任务列表
Stop
在会话结束前检查是否还有未确认的任务状态
当前我把自定义 hook 文件统一放在:
~/.claude/hooks/
然后在项目的 .claude/settings.local.json 中配置 hook command:
{
"hooks": {
"PostToolUse": [
{
"matcher": "TaskCreate|TaskUpdate|TaskList",
"hooks": [
{
"type": "command",
"command": "python ~/.claude/hooks/task_cleanup_hook.py post-tool-use",
"timeout": 5,
"statusMessage": "Recording task cleanup state"
}
]
}
],
"PreCompact": [
{
"matcher": "manual|auto",
"hooks": [
{
"type": "command",
"command": "python ~/.claude/hooks/task_cleanup_hook.py pre-compact",
"timeout": 5,
"statusMessage": "Snapshotting task cleanup state"
}
]
}
],
"PostCompact": [
{
"matcher": "manual|auto",
"hooks": [
{
"type": "command",
"command": "python ~/.claude/hooks/task_cleanup_hook.py post-compact",
"timeout": 5,
"statusMessage": "Checking post-compact task state"
}
]
}
],
"Stop": [
{
"hooks": [
{
"type": "command",
"command": "python ~/.claude/hooks/task_cleanup_hook.py stop",
"timeout": 5,
"statusMessage": "Checking task cleanup"
}
]
}
]
}
}
最后附上脚本文件
#!/usr/bin/env python3
import json
import os
import re
import sys
import time
from collections import deque
from pathlib import Path
STATE_PATH = Path.home() / ".claude" / "hooks" / "task_cleanup_state.json"
OPEN_STATUSES = {"pending", "in_progress"}
TRACKED_TOOLS = {"TaskCreate", "TaskUpdate", "TaskList"}
MAX_TRANSCRIPT_LINES = 2000
MAX_EXCERPT_CHARS = 4000
COMPACT_REBIND_SECONDS = 30 * 60
def emit(payload):
sys.stdout.write(json.dumps(payload, separators=(",", ":")) + "\n")
def read_payload():
raw = sys.stdin.read()
if not raw.strip():
return {}
try:
return json.loads(raw)
except json.JSONDecodeError:
return {}
def load_state():
try:
with STATE_PATH.open("r", encoding="utf-8") as f:
state = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
state = {}
cutoff = time.time() - 7 * 24 * 60 * 60
return {
key: value
for key, value in state.items()
if value.get("updated_at", 0) >= cutoff
}
def save_state(state):
STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
tmp_path = STATE_PATH.with_suffix(".tmp")
with tmp_path.open("w", encoding="utf-8") as f:
json.dump(state, f, separators=(",", ":"))
os.replace(tmp_path, STATE_PATH)
def session_key(payload):
return str(
payload.get("session_id")
or payload.get("transcript_path")
or payload.get("cwd")
or "default"
)
def compact_kind(payload):
return str(
payload.get("compact_type")
or payload.get("trigger")
or payload.get("matcher")
or payload.get("source")
or "compact"
)
def collect_strings(value):
if isinstance(value, str):
return [value]
if isinstance(value, dict):
strings = []
for item in value.values():
strings.extend(collect_strings(item))
return strings
if isinstance(value, list):
strings = []
for item in value:
strings.extend(collect_strings(item))
return strings
return []
def response_text(payload):
parts = []
for key in ("tool_response", "tool_result", "result", "response", "output"):
parts.extend(collect_strings(payload.get(key, {})))
return "\n".join(parts)
def task_list_state(text):
if not text or not text.strip():
return None
lower = text.lower()
clean_markers = ("no tasks found", "no tasks", "empty task list")
if any(marker in lower for marker in clean_markers):
return "clean"
statuses = re.findall(r"\[(pending|in_progress|completed)\]", lower)
if statuses:
return "open" if any(status in OPEN_STATUSES for status in statuses) else "clean"
if re.search(r"\b(pending|in_progress)\b", lower):
return "open"
if re.search(r"\bcompleted\b", lower):
return "clean"
return None
def transcript_path(payload):
raw_path = payload.get("transcript_path")
if not raw_path:
return None
path = Path(raw_path).expanduser()
return path if path.exists() else None
def recent_transcript_lines(payload):
path = transcript_path(payload)
if not path:
return []
try:
with path.open("r", encoding="utf-8", errors="replace") as f:
return list(deque(f, maxlen=MAX_TRANSCRIPT_LINES))
except OSError:
return []
def text_from_block_content(content):
return "\n".join(collect_strings(content))
def latest_tasklist_text_from_transcript(payload):
tool_names = {}
latest = ""
for line in recent_transcript_lines(payload):
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
message = event.get("message") or {}
blocks = message.get("content")
if not isinstance(blocks, list):
continue
for block in blocks:
if not isinstance(block, dict):
continue
if block.get("type") == "tool_use":
tool_names[block.get("id")] = block.get("name")
continue
if block.get("type") != "tool_result":
continue
if tool_names.get(block.get("tool_use_id")) == "TaskList":
latest = text_from_block_content(block.get("content"))
return latest[-MAX_EXCERPT_CHARS:]
def observed_tasklist_state(payload):
text = response_text(payload)
state = task_list_state(text)
if state is not None:
return state, text[-MAX_EXCERPT_CHARS:]
text = latest_tasklist_text_from_transcript(payload)
return task_list_state(text), text[-MAX_EXCERPT_CHARS:]
def entry_metadata(payload, tool_name):
return {
"last_tool": tool_name,
"updated_at": time.time(),
"cwd": payload.get("cwd"),
"transcript_path": payload.get("transcript_path"),
}
def mark_dirty(state, key, payload, tool_name, reason=None):
entry = state.get(key, {})
entry.update(entry_metadata(payload, tool_name))
entry["dirty"] = True
if reason:
entry["reason"] = reason
state[key] = entry
return entry
def mark_clean(state, key, payload, tool_name):
entry = state.get(key, {})
entry.update(entry_metadata(payload, tool_name))
entry.update({
"dirty": False,
"reason": None,
"compact_requires_tasklist": False,
"compact_pending": False,
"last_tasklist_state": "clean",
})
state[key] = entry
return entry
def related_entry(state, payload):
key = session_key(payload)
if key in state:
return key, state[key]
cwd = payload.get("cwd")
if not cwd:
return key, {}
now = time.time()
candidates = [
(candidate_key, value)
for candidate_key, value in state.items()
if value.get("cwd") == cwd
and value.get("compact_pending")
and now - value.get("updated_at", 0) <= COMPACT_REBIND_SECONDS
]
if not candidates:
return key, {}
return max(candidates, key=lambda item: item[1].get("updated_at", 0))
def post_tool_use():
payload = read_payload()
tool_name = payload.get("tool_name")
if tool_name not in TRACKED_TOOLS:
emit({"suppressOutput": True})
return
state = load_state()
key = session_key(payload)
if tool_name == "TaskList":
list_state, text = observed_tasklist_state(payload)
if list_state == "clean":
entry = mark_clean(state, key, payload, tool_name)
elif list_state == "open":
entry = mark_dirty(state, key, payload, tool_name, "open_tasks")
entry["last_tasklist_state"] = "open"
else:
entry = mark_dirty(state, key, payload, tool_name, "unconfirmed_tasklist")
entry["last_tasklist_state"] = "unknown"
if text:
entry["last_tasklist_excerpt"] = text[-MAX_EXCERPT_CHARS:]
else:
mark_dirty(state, key, payload, tool_name, "task_state_changed")
save_state(state)
emit({"suppressOutput": True})
def pre_compact():
payload = read_payload()
state = load_state()
key, entry = related_entry(state, payload)
key = session_key(payload) if key not in state else key
list_state, text = observed_tasklist_state(payload)
should_reconcile = bool(entry.get("dirty")) or list_state == "open"
if not should_reconcile:
emit({"suppressOutput": True})
return
entry.update(entry_metadata(payload, "PreCompact"))
entry.update({
"dirty": True,
"reason": "pre_compact_task_reconcile",
"compact_pending": True,
"compact_requires_tasklist": True,
"compact_kind": compact_kind(payload),
"precompact_had_open_tasks": list_state == "open" or bool(entry.get("dirty")),
})
if text:
entry["precompact_tasklist_excerpt"] = text[-MAX_EXCERPT_CHARS:]
state[key] = entry
save_state(state)
emit({
"hookSpecificOutput": {
"hookEventName": "PreCompact",
"additionalContext": "Before compacting, preserve that task-list state may require reconciliation. After compaction, call TaskList before relying on task status, then update any pending or in_progress tasks that are already complete."
},
"suppressOutput": True,
})
def post_compact():
payload = read_payload()
state = load_state()
current_key = session_key(payload)
source_key, entry = related_entry(state, payload)
should_reconcile = bool(entry.get("dirty")) or bool(entry.get("compact_pending"))
if not should_reconcile:
emit({"suppressOutput": True})
return
entry = dict(entry)
entry.update(entry_metadata(payload, "PostCompact"))
entry.update({
"dirty": True,
"reason": "post_compact_task_reconcile",
"compact_pending": False,
"compact_requires_tasklist": True,
"compact_kind": compact_kind(payload),
"postcompact_at": time.time(),
})
state[current_key] = entry
if source_key != current_key and source_key in state:
state[source_key]["compact_rebound_to"] = current_key
save_state(state)
emit({
"hookSpecificOutput": {
"hookEventName": "PostCompact",
"additionalContext": "A compaction just occurred after task-list activity. Call TaskList before continuing, compare it with the work completed so far, and use TaskUpdate to reconcile stale pending or in_progress tasks before stopping."
},
"suppressOutput": True,
})
def stop():
payload = read_payload()
state = load_state()
key, entry = related_entry(state, payload)
current_key = session_key(payload)
if key != current_key and entry:
state[current_key] = dict(entry)
key = current_key
entry = state[key]
if not entry.get("dirty") and not entry.get("compact_requires_tasklist"):
emit({"suppressOutput": True})
return
list_state, text = observed_tasklist_state(payload)
if list_state == "clean":
mark_clean(state, key, payload, "TaskList")
if text:
state[key]["last_tasklist_excerpt"] = text[-MAX_EXCERPT_CHARS:]
save_state(state)
emit({"suppressOutput": True})
return
if list_state == "open" and text:
entry["last_tasklist_excerpt"] = text[-MAX_EXCERPT_CHARS:]
entry["last_tasklist_state"] = "open"
state[key] = entry
save_state(state)
if entry.get("compact_requires_tasklist"):
reason = "Task cleanup required after compaction: call TaskList, reconcile any stale pending or in_progress tasks with TaskUpdate, then stop."
else:
reason = "Task cleanup required: this session used TaskCreate or TaskUpdate, and no later TaskList confirmed that all tasks are completed. Call TaskList, resolve any pending or in_progress tasks with TaskUpdate, then stop."
emit({
"decision": "block",
"reason": reason,
"suppressOutput": True,
})
def main():
mode = sys.argv[1] if len(sys.argv) > 1 else ""
if mode == "post-tool-use":
post_tool_use()
elif mode == "pre-compact":
pre_compact()
elif mode == "post-compact":
post_compact()
elif mode == "stop":
stop()
else:
emit({"suppressOutput": True})
if __name__ == "__main__":
main()
1 个帖子 - 1 位参与者