Claude Code 任务列表部分情况不实时更新的解决思路

好久没有发帖子了,把最近解决 Claude Code 任务列表更新不及时的问题做个记录。 问题是发现CC任务完成了,任务列表显示还有未完成项,问一下它会说已经完成了,然后将任务列表更新,主要应该包含以下几种状态: 已完成的任务仍然显示为 pending 或 in_progress 会话压缩之后,Cl...
Claude Code 任务列表部分情况不实时更新的解决思路
Claude Code 任务列表部分情况不实时更新的解决思路

好久没有发帖子了,把最近解决 Claude Code 任务列表更新不及时的问题做个记录。

问题是发现CC任务完成了,任务列表显示还有未完成项,问一下它会说已经完成了,然后将任务列表更新,主要应该包含以下几种状态:

  • 已完成的任务仍然显示为 pendingin_progress
  • 会话压缩之后,Claude 继续依赖旧的任务列表判断
  • 结束会话前,任务列表状态没有被再次确认
  • 多次 TaskCreate / TaskUpdate 后,没有可靠机制提醒 Claude 做最终清理

但是一直这样也不是个事,看了下 Github 的 Issues,也有几个跟这个相关,处于还没有解决的状态。想了下,这个应该可以通过 Hook 解决。

只要本轮会话中发生过任务状态变更,就在关键时刻提醒 Claude 重新检查任务列表,并在停止前确保任务状态被清理。

添加一个本地 Python hook:task_cleanup_hook.py。它会维护一个很小的状态文件,用来记录当前会话是否发生过任务列表相关变化。主要监听几个 Claude Code 生命周期事件:

Hook 事件 用途 PostToolUse 当调用 TaskCreateTaskUpdateTaskList 后记录任务状态变化 PreCompact 在上下文压缩前记录任务列表可能需要恢复或核对 PostCompact 在压缩后提醒 Claude 重新读取任务列表 Stop 在会话结束前检查是否还有未确认的任务状态

当前我把自定义 hook 文件统一放在:

~/.claude/hooks/

然后在项目的 .claude/settings.local.json 中配置 hook command

{
  "hooks": {
    "PostToolUse": [
      {
        "matcher": "TaskCreate|TaskUpdate|TaskList",
        "hooks": [
          {
            "type": "command",
            "command": "python ~/.claude/hooks/task_cleanup_hook.py post-tool-use",
            "timeout": 5,
            "statusMessage": "Recording task cleanup state"
          }
        ]
      }
    ],
    "PreCompact": [
      {
        "matcher": "manual|auto",
        "hooks": [
          {
            "type": "command",
            "command": "python ~/.claude/hooks/task_cleanup_hook.py pre-compact",
            "timeout": 5,
            "statusMessage": "Snapshotting task cleanup state"
          }
        ]
      }
    ],
    "PostCompact": [
      {
        "matcher": "manual|auto",
        "hooks": [
          {
            "type": "command",
            "command": "python ~/.claude/hooks/task_cleanup_hook.py post-compact",
            "timeout": 5,
            "statusMessage": "Checking post-compact task state"
          }
        ]
      }
    ],
    "Stop": [
      {
        "hooks": [
          {
            "type": "command",
            "command": "python ~/.claude/hooks/task_cleanup_hook.py stop",
            "timeout": 5,
            "statusMessage": "Checking task cleanup"
          }
        ]
      }
    ]
  }
}

最后附上脚本文件

#!/usr/bin/env python3
import json
import os
import re
import sys
import time
from collections import deque
from pathlib import Path

STATE_PATH = Path.home() / ".claude" / "hooks" / "task_cleanup_state.json"
OPEN_STATUSES = {"pending", "in_progress"}
TRACKED_TOOLS = {"TaskCreate", "TaskUpdate", "TaskList"}
MAX_TRANSCRIPT_LINES = 2000
MAX_EXCERPT_CHARS = 4000
COMPACT_REBIND_SECONDS = 30 * 60


def emit(payload):
    sys.stdout.write(json.dumps(payload, separators=(",", ":")) + "\n")


def read_payload():
    raw = sys.stdin.read()
    if not raw.strip():
        return {}
    try:
        return json.loads(raw)
    except json.JSONDecodeError:
        return {}


def load_state():
    try:
        with STATE_PATH.open("r", encoding="utf-8") as f:
            state = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError):
        state = {}

    cutoff = time.time() - 7 * 24 * 60 * 60
    return {
        key: value
        for key, value in state.items()
        if value.get("updated_at", 0) >= cutoff
    }


def save_state(state):
    STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
    tmp_path = STATE_PATH.with_suffix(".tmp")
    with tmp_path.open("w", encoding="utf-8") as f:
        json.dump(state, f, separators=(",", ":"))
    os.replace(tmp_path, STATE_PATH)


def session_key(payload):
    return str(
        payload.get("session_id")
        or payload.get("transcript_path")
        or payload.get("cwd")
        or "default"
    )


def compact_kind(payload):
    return str(
        payload.get("compact_type")
        or payload.get("trigger")
        or payload.get("matcher")
        or payload.get("source")
        or "compact"
    )


def collect_strings(value):
    if isinstance(value, str):
        return [value]
    if isinstance(value, dict):
        strings = []
        for item in value.values():
            strings.extend(collect_strings(item))
        return strings
    if isinstance(value, list):
        strings = []
        for item in value:
            strings.extend(collect_strings(item))
        return strings
    return []


def response_text(payload):
    parts = []
    for key in ("tool_response", "tool_result", "result", "response", "output"):
        parts.extend(collect_strings(payload.get(key, {})))
    return "\n".join(parts)


def task_list_state(text):
    if not text or not text.strip():
        return None

    lower = text.lower()
    clean_markers = ("no tasks found", "no tasks", "empty task list")
    if any(marker in lower for marker in clean_markers):
        return "clean"

    statuses = re.findall(r"\[(pending|in_progress|completed)\]", lower)
    if statuses:
        return "open" if any(status in OPEN_STATUSES for status in statuses) else "clean"

    if re.search(r"\b(pending|in_progress)\b", lower):
        return "open"
    if re.search(r"\bcompleted\b", lower):
        return "clean"

    return None


def transcript_path(payload):
    raw_path = payload.get("transcript_path")
    if not raw_path:
        return None
    path = Path(raw_path).expanduser()
    return path if path.exists() else None


def recent_transcript_lines(payload):
    path = transcript_path(payload)
    if not path:
        return []

    try:
        with path.open("r", encoding="utf-8", errors="replace") as f:
            return list(deque(f, maxlen=MAX_TRANSCRIPT_LINES))
    except OSError:
        return []


def text_from_block_content(content):
    return "\n".join(collect_strings(content))


def latest_tasklist_text_from_transcript(payload):
    tool_names = {}
    latest = ""

    for line in recent_transcript_lines(payload):
        try:
            event = json.loads(line)
        except json.JSONDecodeError:
            continue

        message = event.get("message") or {}
        blocks = message.get("content")
        if not isinstance(blocks, list):
            continue

        for block in blocks:
            if not isinstance(block, dict):
                continue
            if block.get("type") == "tool_use":
                tool_names[block.get("id")] = block.get("name")
                continue
            if block.get("type") != "tool_result":
                continue
            if tool_names.get(block.get("tool_use_id")) == "TaskList":
                latest = text_from_block_content(block.get("content"))

    return latest[-MAX_EXCERPT_CHARS:]


def observed_tasklist_state(payload):
    text = response_text(payload)
    state = task_list_state(text)
    if state is not None:
        return state, text[-MAX_EXCERPT_CHARS:]

    text = latest_tasklist_text_from_transcript(payload)
    return task_list_state(text), text[-MAX_EXCERPT_CHARS:]


def entry_metadata(payload, tool_name):
    return {
        "last_tool": tool_name,
        "updated_at": time.time(),
        "cwd": payload.get("cwd"),
        "transcript_path": payload.get("transcript_path"),
    }


def mark_dirty(state, key, payload, tool_name, reason=None):
    entry = state.get(key, {})
    entry.update(entry_metadata(payload, tool_name))
    entry["dirty"] = True
    if reason:
        entry["reason"] = reason
    state[key] = entry
    return entry


def mark_clean(state, key, payload, tool_name):
    entry = state.get(key, {})
    entry.update(entry_metadata(payload, tool_name))
    entry.update({
        "dirty": False,
        "reason": None,
        "compact_requires_tasklist": False,
        "compact_pending": False,
        "last_tasklist_state": "clean",
    })
    state[key] = entry
    return entry


def related_entry(state, payload):
    key = session_key(payload)
    if key in state:
        return key, state[key]

    cwd = payload.get("cwd")
    if not cwd:
        return key, {}

    now = time.time()
    candidates = [
        (candidate_key, value)
        for candidate_key, value in state.items()
        if value.get("cwd") == cwd
        and value.get("compact_pending")
        and now - value.get("updated_at", 0) <= COMPACT_REBIND_SECONDS
    ]
    if not candidates:
        return key, {}

    return max(candidates, key=lambda item: item[1].get("updated_at", 0))


def post_tool_use():
    payload = read_payload()
    tool_name = payload.get("tool_name")
    if tool_name not in TRACKED_TOOLS:
        emit({"suppressOutput": True})
        return

    state = load_state()
    key = session_key(payload)

    if tool_name == "TaskList":
        list_state, text = observed_tasklist_state(payload)
        if list_state == "clean":
            entry = mark_clean(state, key, payload, tool_name)
        elif list_state == "open":
            entry = mark_dirty(state, key, payload, tool_name, "open_tasks")
            entry["last_tasklist_state"] = "open"
        else:
            entry = mark_dirty(state, key, payload, tool_name, "unconfirmed_tasklist")
            entry["last_tasklist_state"] = "unknown"

        if text:
            entry["last_tasklist_excerpt"] = text[-MAX_EXCERPT_CHARS:]
    else:
        mark_dirty(state, key, payload, tool_name, "task_state_changed")

    save_state(state)
    emit({"suppressOutput": True})


def pre_compact():
    payload = read_payload()
    state = load_state()
    key, entry = related_entry(state, payload)
    key = session_key(payload) if key not in state else key

    list_state, text = observed_tasklist_state(payload)
    should_reconcile = bool(entry.get("dirty")) or list_state == "open"
    if not should_reconcile:
        emit({"suppressOutput": True})
        return

    entry.update(entry_metadata(payload, "PreCompact"))
    entry.update({
        "dirty": True,
        "reason": "pre_compact_task_reconcile",
        "compact_pending": True,
        "compact_requires_tasklist": True,
        "compact_kind": compact_kind(payload),
        "precompact_had_open_tasks": list_state == "open" or bool(entry.get("dirty")),
    })
    if text:
        entry["precompact_tasklist_excerpt"] = text[-MAX_EXCERPT_CHARS:]
    state[key] = entry
    save_state(state)

    emit({
        "hookSpecificOutput": {
            "hookEventName": "PreCompact",
            "additionalContext": "Before compacting, preserve that task-list state may require reconciliation. After compaction, call TaskList before relying on task status, then update any pending or in_progress tasks that are already complete."
        },
        "suppressOutput": True,
    })


def post_compact():
    payload = read_payload()
    state = load_state()
    current_key = session_key(payload)
    source_key, entry = related_entry(state, payload)

    should_reconcile = bool(entry.get("dirty")) or bool(entry.get("compact_pending"))
    if not should_reconcile:
        emit({"suppressOutput": True})
        return

    entry = dict(entry)
    entry.update(entry_metadata(payload, "PostCompact"))
    entry.update({
        "dirty": True,
        "reason": "post_compact_task_reconcile",
        "compact_pending": False,
        "compact_requires_tasklist": True,
        "compact_kind": compact_kind(payload),
        "postcompact_at": time.time(),
    })
    state[current_key] = entry
    if source_key != current_key and source_key in state:
        state[source_key]["compact_rebound_to"] = current_key
    save_state(state)

    emit({
        "hookSpecificOutput": {
            "hookEventName": "PostCompact",
            "additionalContext": "A compaction just occurred after task-list activity. Call TaskList before continuing, compare it with the work completed so far, and use TaskUpdate to reconcile stale pending or in_progress tasks before stopping."
        },
        "suppressOutput": True,
    })


def stop():
    payload = read_payload()
    state = load_state()
    key, entry = related_entry(state, payload)
    current_key = session_key(payload)
    if key != current_key and entry:
        state[current_key] = dict(entry)
        key = current_key
        entry = state[key]

    if not entry.get("dirty") and not entry.get("compact_requires_tasklist"):
        emit({"suppressOutput": True})
        return

    list_state, text = observed_tasklist_state(payload)
    if list_state == "clean":
        mark_clean(state, key, payload, "TaskList")
        if text:
            state[key]["last_tasklist_excerpt"] = text[-MAX_EXCERPT_CHARS:]
        save_state(state)
        emit({"suppressOutput": True})
        return

    if list_state == "open" and text:
        entry["last_tasklist_excerpt"] = text[-MAX_EXCERPT_CHARS:]
        entry["last_tasklist_state"] = "open"
        state[key] = entry
        save_state(state)

    if entry.get("compact_requires_tasklist"):
        reason = "Task cleanup required after compaction: call TaskList, reconcile any stale pending or in_progress tasks with TaskUpdate, then stop."
    else:
        reason = "Task cleanup required: this session used TaskCreate or TaskUpdate, and no later TaskList confirmed that all tasks are completed. Call TaskList, resolve any pending or in_progress tasks with TaskUpdate, then stop."

    emit({
        "decision": "block",
        "reason": reason,
        "suppressOutput": True,
    })


def main():
    mode = sys.argv[1] if len(sys.argv) > 1 else ""
    if mode == "post-tool-use":
        post_tool_use()
    elif mode == "pre-compact":
        pre_compact()
    elif mode == "post-compact":
        post_compact()
    elif mode == "stop":
        stop()
    else:
        emit({"suppressOutput": True})


if __name__ == "__main__":
    main()

1 个帖子 - 1 位参与者

阅读完整话题

来源: LinuxDo 最新话题查看原文