import sys from os import getenv from types import MappingProxyType from typing import Any, Literal, Mapping, cast from langgraph.types import Interrupt, Send # noqa: F401 # Interrupt, Send re-exported for backwards compatibility # --- Empty read-only containers --- EMPTY_MAP: Mapping[str, Any] = MappingProxyType({}) EMPTY_SEQ: tuple[str, ...] = tuple() MISSING = object() # --- Public constants --- TAG_NOSTREAM = sys.intern("langsmith:nostream") """Tag to disable streaming for a chat model.""" TAG_HIDDEN = sys.intern("langsmith:hidden") """Tag to hide a node/edge from certain tracing/streaming environments.""" START = sys.intern("__start__") """The first (maybe virtual) node in graph-style Pregel.""" END = sys.intern("__end__") """The last (maybe virtual) node in graph-style Pregel.""" SELF = sys.intern("__self__") """The implicit branch that handles each node's Control values.""" # --- Reserved write keys --- INPUT = sys.intern("__input__") # for values passed as input to the graph INTERRUPT = sys.intern("__interrupt__") # for dynamic interrupts raised by nodes RESUME = sys.intern("__resume__") # for values passed to resume a node after an interrupt ERROR = sys.intern("__error__") # for errors raised by nodes NO_WRITES = sys.intern("__no_writes__") # marker to signal node didn't write anything SCHEDULED = sys.intern("__scheduled__") # marker to signal node was scheduled (in distributed mode) TASKS = sys.intern("__pregel_tasks") # for Send objects returned by nodes/edges, corresponds to PUSH below RETURN = sys.intern("__return__") # for writes of a task where we simply record the return value # --- Reserved config.configurable keys --- CONFIG_KEY_SEND = sys.intern("__pregel_send") # holds the `write` function that accepts writes to state/edges/reserved keys CONFIG_KEY_READ = sys.intern("__pregel_read") # holds the `read` function that returns a copy of the current state CONFIG_KEY_CALL = sys.intern("__pregel_call") # holds the `call` function that accepts a node/func, args and returns a future CONFIG_KEY_CHECKPOINTER = sys.intern("__pregel_checkpointer") # holds a `BaseCheckpointSaver` passed from parent graph to child graphs CONFIG_KEY_STREAM = sys.intern("__pregel_stream") # holds a `StreamProtocol` passed from parent graph to child graphs CONFIG_KEY_STREAM_WRITER = sys.intern("__pregel_stream_writer") # holds a `StreamWriter` for stream_mode=custom CONFIG_KEY_STORE = sys.intern("__pregel_store") # holds a `BaseStore` made available to managed values CONFIG_KEY_RESUMING = sys.intern("__pregel_resuming") # holds a boolean indicating if subgraphs should resume from a previous checkpoint CONFIG_KEY_TASK_ID = sys.intern("__pregel_task_id") # holds the task ID for the current task CONFIG_KEY_DEDUPE_TASKS = sys.intern("__pregel_dedupe_tasks") # holds a boolean indicating if tasks should be deduplicated (for distributed mode) CONFIG_KEY_ENSURE_LATEST = sys.intern("__pregel_ensure_latest") # holds a boolean indicating whether to assert the requested checkpoint is the latest # (for distributed mode) CONFIG_KEY_DELEGATE = sys.intern("__pregel_delegate") # holds a boolean indicating whether to delegate subgraphs (for distributed mode) CONFIG_KEY_CHECKPOINT_MAP = sys.intern("checkpoint_map") # holds a mapping of checkpoint_ns -> checkpoint_id for parent graphs CONFIG_KEY_CHECKPOINT_ID = sys.intern("checkpoint_id") # holds the current checkpoint_id, if any CONFIG_KEY_CHECKPOINT_NS = sys.intern("checkpoint_ns") # holds the current checkpoint_ns, "" for root graph CONFIG_KEY_NODE_FINISHED = sys.intern("__pregel_node_finished") # holds the value that "answers" an interrupt() call CONFIG_KEY_WRITES = sys.intern("__pregel_writes") # read-only list of existing task writes CONFIG_KEY_SCRATCHPAD = sys.intern("__pregel_scratchpad") # holds a mutable dict for temporary storage scoped to the current task # --- Other constants --- PUSH = sys.intern("__pregel_push") # denotes push-style tasks, ie. those created by Send objects PULL = sys.intern("__pregel_pull") # denotes pull-style tasks, ie. those triggered by edges NS_SEP = sys.intern("|") # for checkpoint_ns, separates each level (ie. graph|subgraph|subsubgraph) NS_END = sys.intern(":") # for checkpoint_ns, for each level, separates the namespace from the task_id CONF = cast(Literal["configurable"], sys.intern("configurable")) # key for the configurable dict in RunnableConfig FF_SEND_V2 = getenv("LANGGRAPH_FF_SEND_V2", "false").lower() == "true" # temporary flag to enable new Send semantics NULL_TASK_ID = sys.intern("00000000-0000-0000-0000-000000000000") # the task_id to use for writes that are not associated with a task RESERVED = { TAG_HIDDEN, # reserved write keys INPUT, INTERRUPT, RESUME, ERROR, NO_WRITES, SCHEDULED, TASKS, # reserved config.configurable keys CONFIG_KEY_SEND, CONFIG_KEY_READ, CONFIG_KEY_CHECKPOINTER, CONFIG_KEY_STREAM, CONFIG_KEY_STREAM_WRITER, CONFIG_KEY_STORE, CONFIG_KEY_CHECKPOINT_MAP, CONFIG_KEY_RESUMING, CONFIG_KEY_TASK_ID, CONFIG_KEY_DEDUPE_TASKS, CONFIG_KEY_ENSURE_LATEST, CONFIG_KEY_DELEGATE, CONFIG_KEY_CHECKPOINT_MAP, CONFIG_KEY_CHECKPOINT_ID, CONFIG_KEY_CHECKPOINT_NS, # other constants PUSH, PULL, NS_SEP, NS_END, CONF, }