From: Thomas Walker Lynch Date: Tue, 3 Feb 2026 08:28:47 +0000 (+0000) Subject: establishing tm in python X-Git-Url: https://git.reasoningtechnology.com/%5B%5E?a=commitdiff_plain;h=7c4f82540f85e511ee3270c05331e48fbe35148d;p=Epimetheus%2F.git establishing tm in python --- diff --git a/developer/authored/Symbol.py b/developer/authored/Symbol.py deleted file mode 100644 index 1240480..0000000 --- a/developer/authored/Symbol.py +++ /dev/null @@ -1,153 +0,0 @@ -#!/usr/bin/env python3 -# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- - -""" -Symbol - -The Definition of Identity. - -Architecture: - - Symbol: The namespace and manager (Factory). - - Symbol.Instance: The atomic unit of identity (The Product). - -Constraints: - - Distinctness: Instances are unique. - - Opaque: No internal state leakage. - - Immutable Metadata: Name and Doc can be set exactly once. - - Static Manager: The Symbol class cannot be instantiated. -""" - -from __future__ import annotations - -import weakref -from typing import Optional - - -class Symbol: - """ - The Manager class. - Maintains the global state for symbol metadata and token generation. - """ - - def __new__(cls): - """ - Prevent instantiation. Symbol is a static namespace/factory. - """ - raise TypeError("The Symbol class is a static namespace and cannot be instantiated.") - - # Global registry for metadata. - # We use class-level storage so Instances remain lightweight (no back-ref needed). - _names: weakref.WeakKeyDictionary[Symbol.Instance ,str] = weakref.WeakKeyDictionary() - _docs: weakref.WeakKeyDictionary[Symbol.Instance ,str] = weakref.WeakKeyDictionary() - - # Inclusive bounding: Start at 0 (The Null Token) - _current_token: int = 0 - - class Instance: - """ - The atomic unit of identity. - """ - # __weakref__ required for WeakKeyDictionary keys - __slots__ = ( - '_token' - ,'__weakref__' - ) - - def __init__(self ,token: int): - self._token = token - - def __repr__(self) -> str: - return "" - - def __eq__(self ,other: object) -> bool: - if( isinstance(other ,Symbol.Instance) ): - return self._token == other._token - return NotImplemented - - def __hash__(self) -> int: - return hash(self._token) - - # ---------------------------------------------------------------------- - # Metadata Accessors - # ---------------------------------------------------------------------- - - @property - def name(self) -> Optional[str]: - """ - Returns the name of the symbol, or None if anonymous. - """ - return Symbol._names.get(self) - - @name.setter - def name(self ,value: str): - """ - Sets the name. - Raises RuntimeError if the name has already been set. - """ - if( self in Symbol._names ): - raise RuntimeError(f"Symbol name is immutable. Already set to '{Symbol._names[self]}'.") - Symbol._names[self] = value - - @property - def doc(self) -> str: - """ - Returns the docstring of the symbol, or "" if none. - """ - return Symbol._docs.get(self ,"") - - @doc.setter - def doc(self ,value: str): - """ - Sets the docstring. - Raises RuntimeError if the docstring has already been set. - Ignores empty strings (setting to "" is a no-op). - """ - if( not value ): return - - if( self in Symbol._docs ): - raise RuntimeError("Symbol docstring is immutable. Already set.") - - Symbol._docs[self] = value - - # ------------------------------------------------------------------------ - # Factory Methods - # ------------------------------------------------------------------------ - - @classmethod - def make(cls ,name: Optional[str] = None ,doc: str = "") -> Instance: - """ - Mints a new, distinct Original Symbol Instance. - - Args: - name: Optional name. If provided, it becomes immutable. - doc: Optional docstring. If provided (non-empty), it becomes immutable. - """ - # The Rest: Increment first, so the first public symbol is 1. - cls._current_token += 1 - token = cls._current_token - - instance = cls.Instance(token) - - if( name is not None ): - # Direct injection to bypass the "check if set" logic of the setter - cls._names[instance] = name - - if( doc ): - cls._docs[instance] = doc - - return instance - - # ------------------------------------------------------------------------ - # Static Initialization (The First) - # ------------------------------------------------------------------------ - # We perform this inside the class body using local names. - # This creates the Null Symbol immediately upon class definition. - - # Note: We must use 'Instance' (local scope) not 'Symbol.Instance' - # because 'Symbol' is not yet bound. - null = Instance(0) - _names[null] = "Null" - _docs[null] = "The Null Symbol" - - -# LocalWords: Accessors diff --git a/developer/authored/TapeMachine.py b/developer/authored/TapeMachine.py new file mode 100644 index 0000000..187b283 --- /dev/null +++ b/developer/authored/TapeMachine.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 +import copy + +class TapeMachine: + """ + TTCA Tape Machine Implementation. + Adapts List, Map, and Set to a common Tape Interface. + """ + def __init__(self, tape_ref, path=None, point=0, iterator=None): + self.tape = tape_ref # The 'Memory' (Shared) + self.path = path or [] # The 'Stack' (Hierarchy context) + + # 'point' is the Address. + # For Lists: Integer Index + # For Maps: Key (Symbol) + # For Sets: The Item itself + self.point = point + + # For Maps/Sets, we need an iterator to support 'Step' (s) + self._iter = iterator + + # --- (e) Entangle --- + def e(self): + """ + Entangled Copy. + Returns a new head sharing the same tape memory. + """ + # We must clone the iterator state if possible, + # though Python iterators are hard to clone. + # We usually restart iteration or assume random access. + return TapeMachine(self.tape, self.path, self.point) + + # --- (r) Read --- + def r(self): + """Reads the cell under the head.""" + container = self._resolve_container() + + if isinstance(container, list): + if 0 <= self.point < len(container): + return container[self.point] + + elif isinstance(container, dict): + return container.get(self.point, None) + + elif isinstance(container, set): + # In a set, if we are 'at' a point, the value IS the point. + # But we must verify it still exists. + return self.point if self.point in container else None + + return None + + # --- (w) Write --- + def w(self, value): + """Writes to the cell under the head.""" + container = self._resolve_container() + + if isinstance(container, list): + container[self.point] = value + + elif isinstance(container, dict): + container[self.point] = value + + elif isinstance(container, set): + raise TypeError("Cannot 'Write' to a Set cell. Use 'd' (Delete) and 'a' (Add).") + + # --- (s) Step --- + def s(self, direction=1): + """ + Move relative to current position. + Direction +1 = Next, -1 = Previous. + """ + container = self._resolve_container() + + if isinstance(container, list): + self.point += direction + + elif isinstance(container, (dict, set)): + # Maps/Sets require iteration to step. + # This is expensive (O(N)) unless we maintain an active iterator. + # Simplified Logic: + try: + # In a real engine, we'd cache the list of keys + keys = list(container.keys()) if isinstance(container, dict) else list(container) + + # Find current index + try: + current_idx = keys.index(self.point) + next_idx = current_idx + direction + if 0 <= next_idx < len(keys): + self.point = keys[next_idx] + except ValueError: + # Current point no longer in set/map, reset to start + if keys: self.point = keys[0] + except: + pass + return self + + # --- (m) Move --- + def m(self, address): + """ + Absolute jump to an address. + List: index (int), Map: key (symbol), Set: member (symbol). + """ + self.point = address + return self + + # --- (a) Allocate / Add --- + def a(self, value, key=None): + """ + Appends or Inserts. + List: Append value. + Map: Insert key:value. + Set: Add value. + """ + container = self._resolve_container() + + if isinstance(container, list): + container.append(value) + + elif isinstance(container, dict): + if key is None: raise ValueError("Map allocation requires key") + container[key] = value + + elif isinstance(container, set): + container.add(value) + + # --- Hierarchy Navigation (Enter/Exit) --- + def enter(self): + """Descends into the current cell.""" + current_val = self.r() + + # Valid container? + if isinstance(current_val, (list, dict, set)): + # Push context + new_path = self.path + [(self._resolve_container(), self.point)] + + # Determine starting point for new container + start_point = 0 + if isinstance(current_val, (dict, set)) and len(current_val) > 0: + # Start at the first key/member + start_point = next(iter(current_val)) + + return TapeMachine(self.tape, new_path, start_point) + return None + + def exit(self): + """Ascends to parent.""" + if not self.path: return None + + # Pop context + parent_container, parent_point = self.path[-1] + new_path = self.path[:-1] + + return TapeMachine(self.tape, new_path, parent_point) + + # --- Internal --- + def _resolve_container(self): + """Drills down the path stack to find current container.""" + curr = self.tape + for container, index in self.path: + if isinstance(container, list): curr = container[index] + elif isinstance(container, dict): curr = container[index] + # Set traversal in path stack implies we 'entered' a member + # (which must be a container itself) + return curr diff --git a/developer/authored/deprecated/ObjectRegistry.py b/developer/authored/deprecated/ObjectRegistry.py deleted file mode 100644 index 9d30fd9..0000000 --- a/developer/authored/deprecated/ObjectRegistry.py +++ /dev/null @@ -1,113 +0,0 @@ -#!/usr/bin/env python3 -# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- - -""" -ObjectRegistry - -Maps Python runtime objects to ProcessLocalId. - -Strategies: - 1. Weak Identity (Preferred): For objects that support weakrefs (instances ,classes). - - ID lifetime is bound to object lifetime. - - Auto-cleaned on GC. - - 2. Value Identity (Fallback): For immutable primitives (int ,str ,tuple). - - ID is bound to the *value* (hash/equality). - - Stored strongly (since values like 42 or "red" are conceptually eternal). -""" - -from __future__ import annotations - -import weakref -from typing import Any ,Callable ,Dict ,Optional - -from .ProcessLocalId import ProcessLocalIdGenerator ,ProcessLocalId - - -class ObjectRegistry: - def __init__(self ,id_gen: ProcessLocalIdGenerator): - self._id_gen = id_gen - - # Strategy 1: Entities (Weakref-able) - self._obj_to_id_wkd: "weakref.WeakKeyDictionary[Any ,ProcessLocalId]" = weakref.WeakKeyDictionary() - self._id_to_obj_ref: Dict[ProcessLocalId ,weakref.ref] = {} - - # Strategy 2: Values (Hashable ,not weakref-able) - self._value_to_id: Dict[Any ,ProcessLocalId] = {} - - self._finalizers: Dict[ProcessLocalId ,Callable[[ProcessLocalId] ,None]] = {} - self._global_finalizer: Optional[Callable[[ProcessLocalId] ,None]] = None - - def register_finalizer(self ,fn: Callable[[ProcessLocalId] ,None]): - """ - Registers a finalizer callback invoked when any registered *weak-refable* object is GC'd. - """ - self._global_finalizer = fn - - def _on_collect(self ,obj_id: ProcessLocalId): - # Only called for Strategy 1 objects - ref = self._id_to_obj_ref.pop(obj_id ,None) - if(ref is not None): - # The WeakKeyDictionary auto-cleans the forward mapping , - # but we double check or clean any edge cases if needed. - pass - - fn = self._global_finalizer - if(fn is not None): - fn(obj_id) - - def get_id(self ,obj: Any) -> ProcessLocalId: - """ - Returns the ProcessLocalId for `obj` ,registering it if needed. - """ - # 1. Try WeakRef Strategy (Entities) - try: - existing = self._obj_to_id_wkd.get(obj) - if(existing is not None): - return existing - except TypeError: - # obj is not weakref-able (e.g. int ,str ,tuple ,or list). - # Fall through to Strategy 2. - pass - else: - # It IS weakref-able ,but wasn't in the dictionary. Register it. - obj_id = self._id_gen.next_id() - self._obj_to_id_wkd[obj] = obj_id - # Create reverse lookup with callback - self._id_to_obj_ref[obj_id] = weakref.ref(obj ,lambda _ref ,oid=obj_id: self._on_collect(oid)) - return obj_id - - # 2. Try Value Strategy (Primitives) - # Note: Mutable non-weakrefables (like standard lists) will fail here because they are unhashable. - try: - existing = self._value_to_id.get(obj) - if(existing is not None): - return existing - - # Register new value - obj_id = self._id_gen.next_id() - self._value_to_id[obj] = obj_id - return obj_id - except TypeError: - # It is neither weakref-able NOR hashable (e.g. standard list ,dict). - raise TypeError( - f"ObjectRegistry: cannot track object of type {type(obj)!r}. " - "It is neither weakref-able (Entity) nor hashable (Value)." - ) - - def try_get_object(self ,obj_id: ProcessLocalId) -> Optional[Any]: - """ - Best-effort: returns the live object/value. - """ - # Check Entities - ref = self._id_to_obj_ref.get(obj_id) - if(ref is not None): - return ref() - - # Check Values - # For now ,we assume values identify themselves. - for val ,pid in self._value_to_id.items(): - if(pid == obj_id): - return val - - return None diff --git a/developer/authored/deprecated/ProcessLocalId.py b/developer/authored/deprecated/ProcessLocalId.py deleted file mode 100644 index b57c47c..0000000 --- a/developer/authored/deprecated/ProcessLocalId.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python3 -# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- - - -""" -ProcessLocalId - -A process-local identifier used as an internal key. - -Design constraint: - - NOT intended to be serialized or persisted. - - `repr()` intentionally does not reveal the numeric token, to discourage logging/persistence. -""" - -from __future__ import annotations - -from dataclasses import dataclass - - -@dataclass(frozen=True ,slots=True) -class ProcessLocalId: - _n: int - - def __repr__(self) -> str: - return "" - - def __str__(self) -> str: - return "" - - def as_int_UNSAFE(self) -> int: - """ - Returns the raw integer token. - - UNSAFE because: - - tokens are process-local - - do not write these into files/databases/logs as stable identifiers - """ - return self._n - - -class ProcessLocalIdGenerator: - """ - Monotonic generator; ids are never recycled. - """ - def __init__(self ,start: int = 1): - if start < 1: raise ValueError("start must be >= 1") - self._next_n: int = start - - def next_id(self) -> ProcessLocalId: - n = self._next_n - self._next_n += 1 - return ProcessLocalId(n) diff --git a/developer/authored/deprecated/Property.py b/developer/authored/deprecated/Property.py deleted file mode 100644 index 5b33226..0000000 --- a/developer/authored/deprecated/Property.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env python3 -# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- - - -""" -Property - -A Property is itself an entity (it has an Identity id) so that: - - properties can have properties - - properties can be members of semantic sets -""" - -from __future__ import annotations - -from dataclasses import dataclass -from typing import Optional ,Tuple - -from .ProcessLocalId import ProcessLocalId - - -@dataclass(frozen=True ,slots=True) -class Property: - id: ProcessLocalId - name_path: Tuple[str ,...] - doc: str = "" - - def __repr__(self) -> str: - # name_path is safe to reveal; id token is not. - return f"" diff --git a/developer/authored/deprecated/PropertyManager.py b/developer/authored/deprecated/PropertyManager.py deleted file mode 100644 index 36e2b9e..0000000 --- a/developer/authored/deprecated/PropertyManager.py +++ /dev/null @@ -1,155 +0,0 @@ -#!/usr/bin/env python3 -# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- - - -""" -PropertyManager - -Core RT property system. - -Key decisions vs the earlier `property_manager.py`: - - do NOT key by `object_path()` strings (avoids collisions) fileciteturn3file4 - - runtime objects are keyed by weak identity (ProcessLocalId assigned by ObjectRegistry) - - properties are first-class entities (Property has an id), so properties can have properties - -This remains process-local and in-memory. -""" - -from __future__ import annotations - -from typing import Any ,Dict ,Iterable ,List ,Optional ,Tuple ,Union - -from .ProcessLocalId import ProcessLocalIdGenerator ,ProcessLocalId -from .ObjectRegistry import ObjectRegistry -from .PropertyStore import PropertyStore -from .Property import Property -from .SemanticSets import SemanticSet ,SemanticSetStore -from .Syntax import SyntaxInstance - - -NamePathLike = Union[str ,List[str] ,Tuple[str ,...]] - - -class PropertyManager: - def __init__(self): - self._id_gen = ProcessLocalIdGenerator() - self._obj_reg = ObjectRegistry(self._id_gen) - self._store = PropertyStore() - self._sets = SemanticSetStore() - - # Declare-by-name registry - self._name_path_to_property: Dict[Tuple[str ,...],Property] = {} - self._property_id_to_property: Dict[ProcessLocalId ,Property] = {} - - self._name_path_to_set: Dict[Tuple[str ,...],SemanticSet] = {} - self._set_id_to_set: Dict[ProcessLocalId ,SemanticSet] = {} - - # Optional syntax instances (if user chooses to model them) - self._syntax_id_to_instance: Dict[ProcessLocalId ,SyntaxInstance] = {} - - # Finalization cleanup - self._obj_reg.register_finalizer(self._on_subject_finalized) - - def _on_subject_finalized(self ,subject_id: ProcessLocalId): - self._store.remove_subject(subject_id) - self._sets.remove_subject(subject_id) - - def _normalize_name_path(self ,name_path: NamePathLike) -> Tuple[str ,...]: - if isinstance(name_path ,tuple): return name_path - if isinstance(name_path ,list): return tuple(name_path) - if isinstance(name_path ,str): return tuple(name_path.split(".")) - raise TypeError("name_path must be str ,list[str] ,or tuple[str ,...]") - - # ------------------------- - # Identity acquisition - # ------------------------- - def id_of_py_object(self ,obj: Any) -> ProcessLocalId: - return self._obj_reg.get_id(obj) - - def create_syntax_identity(self ,syntax: SyntaxInstance) -> ProcessLocalId: - sid = self._id_gen.next_id() - self._syntax_id_to_instance[sid] = syntax - return sid - - def try_get_syntax(self ,syntax_id: ProcessLocalId) -> Optional[SyntaxInstance]: - return self._syntax_id_to_instance.get(syntax_id) - - # ------------------------- - # Property declaration - # ------------------------- - def declare_property(self ,name_path: NamePathLike ,doc: str = "") -> ProcessLocalId: - np = self._normalize_name_path(name_path) - existing = self._name_path_to_property.get(np) - if existing is not None: return existing.id - pid = self._id_gen.next_id() - p = Property(pid ,np ,doc) - self._name_path_to_property[np] = p - self._property_id_to_property[pid] = p - return pid - - def property_id(self ,name_path: NamePathLike) -> ProcessLocalId: - np = self._normalize_name_path(name_path) - p = self._name_path_to_property.get(np) - if p is None: raise KeyError(f"Property not declared: {np!r}") - return p.id - - def try_get_property(self ,prop_id: ProcessLocalId) -> Optional[Property]: - return self._property_id_to_property.get(prop_id) - - # ------------------------- - # Semantic sets - # ------------------------- - def declare_set(self ,name_path: NamePathLike ,doc: str = "") -> ProcessLocalId: - np = self._normalize_name_path(name_path) - existing = self._name_path_to_set.get(np) - if existing is not None: return existing.id - sid = self._id_gen.next_id() - s = SemanticSet(sid ,np ,doc) - self._name_path_to_set[np] = s - self._set_id_to_set[sid] = s - return sid - - def add_to_set(self ,subject: Any ,set_id: ProcessLocalId): - subject_id = self._coerce_subject_id(subject) - self._sets.add_member(set_id ,subject_id) - - def is_in_set(self ,subject: Any ,set_id: ProcessLocalId) -> bool: - subject_id = self._coerce_subject_id(subject) - return self._sets.has_member(set_id ,subject_id) - - def members(self ,set_id: ProcessLocalId) -> List[ProcessLocalId]: - return list(self._sets.members(set_id)) - - # ------------------------- - # Set/get properties - # ------------------------- - def set(self ,subject: Any ,prop: Union[ProcessLocalId ,NamePathLike] ,value: Any): - subject_id = self._coerce_subject_id(subject) - prop_id = self._coerce_property_id(prop) - self._store.set(subject_id ,prop_id ,value) - - def get(self ,subject: Any ,prop: Union[ProcessLocalId ,NamePathLike] ,default: Any = None) -> Any: - subject_id = self._coerce_subject_id(subject) - prop_id = self._coerce_property_id(prop) - return self._store.get(subject_id ,prop_id ,default) - - def has(self ,subject: Any ,prop: Union[ProcessLocalId ,NamePathLike]) -> bool: - subject_id = self._coerce_subject_id(subject) - prop_id = self._coerce_property_id(prop) - return self._store.has(subject_id ,prop_id) - - def subjects_with(self ,prop: Union[ProcessLocalId ,NamePathLike]) -> List[ProcessLocalId]: - prop_id = self._coerce_property_id(prop) - return list(self._store.subjects_with(prop_id)) - - # ------------------------- - # Coercions - # ------------------------- - def _coerce_subject_id(self ,subject: Any) -> ProcessLocalId: - if isinstance(subject ,ProcessLocalId): return subject - # For Python runtime objects, we require weakref-able instances. - return self._obj_reg.get_id(subject) - - def _coerce_property_id(self ,prop: Union[ProcessLocalId ,NamePathLike]) -> ProcessLocalId: - if isinstance(prop ,ProcessLocalId): return prop - return self.property_id(prop) diff --git a/developer/authored/deprecated/PropertyStore.py b/developer/authored/deprecated/PropertyStore.py deleted file mode 100644 index bc8f3f3..0000000 --- a/developer/authored/deprecated/PropertyStore.py +++ /dev/null @@ -1,59 +0,0 @@ -#!/usr/bin/env python3 -# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- - - -""" -PropertyStore - -Stores property values and maintains reverse lookups. - -This is intentionally process-local and in-memory. -""" - -from __future__ import annotations - -from typing import Any ,Dict ,Optional ,Set ,Tuple - -from .ProcessLocalId import ProcessLocalId - - -class PropertyStore: - def __init__(self): - # (subject_id ,property_id) -> value - self._values: Dict[Tuple[ProcessLocalId ,ProcessLocalId] ,Any] = {} - - # subject_id -> set(property_id) - self._subject_to_props: Dict[ProcessLocalId ,Set[ProcessLocalId]] = {} - - # property_id -> set(subject_id) - self._prop_to_subjects: Dict[ProcessLocalId ,Set[ProcessLocalId]] = {} - - def set(self ,subject_id: ProcessLocalId ,prop_id: ProcessLocalId ,value: Any): - key = (subject_id ,prop_id) - self._values[key] = value - self._subject_to_props.setdefault(subject_id ,set()).add(prop_id) - self._prop_to_subjects.setdefault(prop_id ,set()).add(subject_id) - - def get(self ,subject_id: ProcessLocalId ,prop_id: ProcessLocalId ,default: Any = None) -> Any: - return self._values.get((subject_id ,prop_id) ,default) - - def has(self ,subject_id: ProcessLocalId ,prop_id: ProcessLocalId) -> bool: - return (subject_id ,prop_id) in self._values - - def subjects_with(self ,prop_id: ProcessLocalId) -> Set[ProcessLocalId]: - return set(self._prop_to_subjects.get(prop_id ,set())) - - def props_of(self ,subject_id: ProcessLocalId) -> Set[ProcessLocalId]: - return set(self._subject_to_props.get(subject_id ,set())) - - def remove_subject(self ,subject_id: ProcessLocalId): - """ - Remove all stored properties for a subject (used on finalization). - """ - prop_ids = self._subject_to_props.pop(subject_id ,set()) - for prop_id in prop_ids: - self._values.pop((subject_id ,prop_id) ,None) - s = self._prop_to_subjects.get(prop_id) - if s is not None: - s.discard(subject_id) - if not s: self._prop_to_subjects.pop(prop_id ,None) diff --git a/developer/authored/deprecated/SemanticSets.py b/developer/authored/deprecated/SemanticSets.py deleted file mode 100644 index f0fdf48..0000000 --- a/developer/authored/deprecated/SemanticSets.py +++ /dev/null @@ -1,55 +0,0 @@ -#!/usr/bin/env python3 -# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- - - -""" -SemanticSets - -Membership sets over identities. Used for semantic typing. - -Design: - - set_id identifies the set - - members are subject ids - - reverse index for cleanup -""" - -from __future__ import annotations - -from dataclasses import dataclass -from typing import Dict ,Optional ,Set - -from .ProcessLocalId import ProcessLocalId - - -@dataclass(frozen=True ,slots=True) -class SemanticSet: - id: ProcessLocalId - name_path: tuple[str ,...] - doc: str = "" - - def __repr__(self) -> str: - return f"" - - -class SemanticSetStore: - def __init__(self): - self._members: Dict[ProcessLocalId ,Set[ProcessLocalId]] = {} - self._subject_to_sets: Dict[ProcessLocalId ,Set[ProcessLocalId]] = {} - - def add_member(self ,set_id: ProcessLocalId ,subject_id: ProcessLocalId): - self._members.setdefault(set_id ,set()).add(subject_id) - self._subject_to_sets.setdefault(subject_id ,set()).add(set_id) - - def has_member(self ,set_id: ProcessLocalId ,subject_id: ProcessLocalId) -> bool: - return subject_id in self._members.get(set_id ,set()) - - def members(self ,set_id: ProcessLocalId) -> Set[ProcessLocalId]: - return set(self._members.get(set_id ,set())) - - def remove_subject(self ,subject_id: ProcessLocalId): - set_ids = self._subject_to_sets.pop(subject_id ,set()) - for set_id in set_ids: - m = self._members.get(set_id) - if m is not None: - m.discard(subject_id) - if not m: self._members.pop(set_id ,None) diff --git a/developer/authored/deprecated/Syntax.py b/developer/authored/deprecated/Syntax.py deleted file mode 100644 index 4fcfa7c..0000000 --- a/developer/authored/deprecated/Syntax.py +++ /dev/null @@ -1,59 +0,0 @@ -#!/usr/bin/env python3 -# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- - - -""" -Syntax - -RT syntax identity instances. - -We treat "syntax" as AST-level objects: - - kind: official-ish AST node kind name (e.g., "ast.FunctionDef") - - location: file + span - - scope: enclosing syntax identity id (optional) - - parts: mapping of part-name to literal or referenced syntax identity id(s) - -This module does NOT traverse Python programs. It only defines the data model. -""" - -from __future__ import annotations - -from dataclasses import dataclass -from typing import Any ,Dict ,Optional ,Tuple ,Union ,List - -from .ProcessLocalId import ProcessLocalId - - -@dataclass(frozen=True ,slots=True) -class SourceSpan: - file_path: str - lineno: int - col: int - end_lineno: int - end_col: int - - -SyntaxPartValue = Union[ - None - ,bool - ,int - ,float - ,str - ,ProcessLocalId - ,List["SyntaxPartValue"] - ,Dict[str ,"SyntaxPartValue"] -] - - -@dataclass(frozen=True ,slots=True) -class SyntaxInstance: - """ - A single syntax node instance. - - NOTE: many syntax nodes have no identifier-name. Name-like things (identifiers) - appear as child nodes or literals inside `parts`. - """ - kind: str - span: SourceSpan - scope_id: Optional[ProcessLocalId] = None - parts: Dict[str ,SyntaxPartValue] = None diff --git a/developer/authored/tape_machine.py b/developer/authored/tape_machine.py new file mode 100755 index 0000000..d7cb1fe --- /dev/null +++ b/developer/authored/tape_machine.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3 +import abc +from enum import Enum ,auto + +# ========================================== +# 1. Enums and Constants +# ========================================== + +class Topology(Enum): + NULL = auto() + ,SEGMENT + ,LINEAR_RIGHT + ,CIRCLE + ,LOOP_AND_TAIL + +class Status(Enum): + ABANDONED = auto() + ,EMPTY + ,ACTIVE + +# ========================================== +# 2. Handler Registry (The Method Tape) +# ========================================== + +def handler_hard_error(tm ,*args): + """The default 'Mean' behavior: Hard Stop.""" + raise RuntimeError(f"Operation not permitted on {type(tm).__name__}") + +def handler_ignore(tm ,*args): + """The 'Black Hole' behavior: Do nothing, return None.""" + return None + +def handler_write_list(tm ,val): + """Standard Write for Python List backing.""" + tm.tape[tm.head] = val + +def handler_delete_right_neighbor(tm): + """ + Primitive 'd' command: Deletes the RIGHT neighbor. + Appropriate for Singly Linked Tapes where looking back is hard. + """ + if tm.head + 1 < len(tm.tape): + del tm.tape[tm.head + 1] + else: + # Default behavior when no neighbor exists + pass + +# ========================================== +# 3. Interfaces and Machines +# ========================================== + +class TMInterface(abc.ABC): + """ + The shared interface for First and Second Order machines. + """ + @abc.abstractmethod + def r(self): pass + + @abc.abstractmethod + def w(self ,v): pass + + @abc.abstractmethod + def s(self ,n=1): pass + + @abc.abstractmethod + def d(self): pass + + @abc.abstractmethod + def e(self): pass + + @abc.abstractmethod + def rightmost(self): pass + +class NullTM(TMInterface): + """ + The 'Mean Machine'. + Represents a machine with NO tape. + All operations throw hard errors by default. + """ + def __init__(self): + self._write_handler = handler_hard_error + self._delete_handler = handler_hard_error + self._read_handler = handler_hard_error + self._step_handler = handler_hard_error + + def r(self): return self._read_handler(self) + def w(self ,v): return self._write_handler(self ,v) + def s(self ,n=1): return self._step_handler(self ,n) + def d(self): return self._delete_handler(self) + + def topology(self): return Topology.NULL + def status(self): return Status.EMPTY + + def rightmost(self): return self._read_handler(self) + def e(self): return self._read_handler(self) + +class VanillaTM(TMInterface): + """ + The Plain Vanilla Finite Tape Machine. + Topology: SEGMENT (Finite List). + """ + def __init__(self ,data): + self.tape = data + self.head = 0 + + # Registered Handlers (The Method Tape) + self._write_handler = handler_write_list + self._delete_handler = handler_delete_right_neighbor + + def r(self): + # Critical Loop Speed: No error checking. + return self.tape[self.head] + + def w(self ,v): + return self._write_handler(self ,v) + + def d(self): + return self._delete_handler(self) + + def s(self ,n=1): + # Standard Segment Step + self.head += n + return self + + def e(self): + # Entangle: Create new machine, same tape, same handlers + new_tm = VanillaTM(self.tape) + new_tm.head = self.head + new_tm._write_handler = self._write_handler + new_tm._delete_handler = self._delete_handler + return new_tm + + def rightmost(self): + return self.head >= len(self.tape) - 1 + + def topology(self): + return Topology.SEGMENT + + def set_readonly(self): + """Configuration Helper: Swaps the write handler.""" + self._write_handler = handler_hard_error + +class StatusTM(TMInterface): + """ + Second Order Machine. + Wraps a First Order Machine to handle Lifecycle (Empty/Active). + """ + def __init__(self ,data=None): + if data and len(data) > 0: + self.tm = VanillaTM(data) + self._stat = Status.ACTIVE + else: + self.tm = NullTM() + self._stat = Status.EMPTY + + def empty(self): + return self._stat == Status.EMPTY + + # --- Delegation --- + + def r(self): return self.tm.r() + def w(self ,v): return self.tm.w(v) + def s(self ,n=1): return self.tm.s(n) + def d(self): return self.tm.d() + + def e(self): + if self.empty(): return StatusTM(None) + + # Create new wrapper around entangled inner + new_wrapper = StatusTM.__new__(StatusTM) + new_wrapper.tm = self.tm.e() + new_wrapper._stat = Status.ACTIVE + return new_wrapper + + def rightmost(self): + if self.empty(): return True + return self.tm.rightmost() + +# ========================================== +# 4. Compiler +# ========================================== + +def compile_compound(tm ,command_str): + """ + Compiles a string like 'wsr' or 's-d' into a callable function. + """ + steps = [] + i = 0 + while i < len(command_str): + char = command_str[i] + + if char == 'w': + steps.append(lambda t ,v=None: t.w(v)) + elif char == 'r': + steps.append(lambda t: t.r()) + elif char == 'd': + steps.append(lambda t: t.d()) + elif char == 's': + steps.append(lambda t: t.s(1)) + elif char == '-': + # Lookahead for modifier + if i + 1 < len(command_str): + next_char = command_str[i+1] + if next_char == 's': + steps.append(lambda t: t.s(-1)) + i += 1 + i += 1 + + def compiled_func(value_for_write=None): + res = None + for step in steps: + try: + res = step(tm ,value_for_write) + except TypeError: + res = step(tm) + return res + + return compiled_func + +# ========================================== +# 5. CLI / Verification +# ========================================== + +def CLI(): + print("--- TTCA Tape Machine Verification ---") + + # 1. Setup Data + data = [ + 'A' + ,'B' + ,'C' + ] + stm = StatusTM(data) + + # 2. Config: Make it Read-Only (Runtime Handler Swap) + if not stm.empty(): + stm.tm.set_readonly() + + # 3. The Loop (First-Rest Pattern) + if not stm.empty(): + # First + print(f"First: {stm.r()}") + + # Try Illegal Write + try: + stm.w('Z') + except RuntimeError as e: + print(f"Caught Expected Error: {e}") + + # Rest + while not stm.rightmost(): + stm.s() + print(f"Rest: {stm.r()}") + + # 4. The Null Case + empty_stm = StatusTM([]) + if not empty_stm.empty(): + pass + else: + print("Empty machine handled correctly (skipped).") + + # 5. Delete (The 'd' command) + d_list = [ + '1' + ,'2' + ,'3' + ] + dtm = StatusTM(d_list) + if not dtm.empty(): + print(f"Before Delete: {d_list}") + dtm.d() + print(f"After Delete: {d_list}") + +if __name__ == "__main__": + CLI() + diff --git a/developer/authored/test_SymbolFactory.py b/developer/authored/test_SymbolFactory.py deleted file mode 100755 index 8bc21dc..0000000 --- a/developer/authored/test_SymbolFactory.py +++ /dev/null @@ -1,116 +0,0 @@ -#!/usr/bin/env python3 -# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- - -""" -Tests for Symbol.Instance constraints. -""" - -import unittest -from Symbol import Symbol - -class TestSymbol(unittest.TestCase): - - def test_null_symbol_exists(self): - """ - Requirement: Symbol.null exists and is Token 0. - """ - null_sym = Symbol.null - self.assertIsNotNone(null_sym) - self.assertEqual(null_sym.name ,"Null") - self.assertEqual(null_sym.doc ,"The Null Symbol") - - def test_distinctness(self): - """ - Requirement: Two distinct originals will always be not equal. - """ - s1 = Symbol.make() - s2 = Symbol.make() - - self.assertNotEqual(s1 ,s2) - self.assertIsNot(s1 ,s2) - self.assertNotEqual(s1 ,Symbol.null) - - def test_name_immutability(self): - """ - Requirement: Name can be set once, but never changed. - """ - # Case 1: Set at creation - s1 = Symbol.make(name="Alpha") - self.assertEqual(s1.name ,"Alpha") - - # Attempt to change - with self.assertRaises(RuntimeError): - s1.name = "Beta" - - # Case 2: Late binding - s2 = Symbol.make() - self.assertIsNone(s2.name) - - s2.name = "Gamma" - self.assertEqual(s2.name ,"Gamma") - - # Attempt to change after late bind - with self.assertRaises(RuntimeError): - s2.name = "Delta" - - def test_doc_immutability(self): - """ - Requirement: Doc can be set once, but never changed. - """ - # Case 1: Set at creation - s1 = Symbol.make(doc="Original Doc") - self.assertEqual(s1.doc ,"Original Doc") - - with self.assertRaises(RuntimeError): - s1.doc = "New Doc" - - # Case 2: Late binding - s2 = Symbol.make() - self.assertEqual(s2.doc ,"") # Default is empty string - - s2.doc = "Late Doc" - self.assertEqual(s2.doc ,"Late Doc") - - with self.assertRaises(RuntimeError): - s2.doc = "Changed Doc" - - def test_doc_empty_string_behavior(self): - """ - Requirement: Setting doc to "" is ignored and does not count as 'setting' it. - """ - s1 = Symbol.make() - - # Setting empty string should be a no-op - s1.doc = "" - self.assertEqual(s1.doc ,"") - - # Should still be able to set it later because "" didn't lock it - s1.doc = "Real Doc" - self.assertEqual(s1.doc ,"Real Doc") - - # NOW it is locked - with self.assertRaises(RuntimeError): - s1.doc = "Trying to change" - - def test_property_access(self): - """ - Requirement: Read/Write via properties. - """ - s = Symbol.make() - s.name = "Velocity" - s.doc = "m/s" - - self.assertEqual(s.name ,"Velocity") - self.assertEqual(s.doc ,"m/s") - self.assertIsInstance(s ,Symbol.Instance) - - def test_opaque_representation(self): - """ - Requirement: repr() reveals no internal token. - """ - s1 = Symbol.make() - self.assertEqual(repr(s1) ,"") - - -if __name__ == '__main__': - unittest.main() diff --git a/developer/authored/tm.py b/developer/authored/tm.py new file mode 100644 index 0000000..a798a51 --- /dev/null +++ b/developer/authored/tm.py @@ -0,0 +1,301 @@ +#!/usr/bin/env python3 +import abc +from enum import Enum ,auto + +# ========================================== +# 1. Enums and Constants +# ========================================== + +class Topology(Enum): + NULL = auto() + SEGMENT = auto() + LINEAR_RIGHT = auto() + CIRCLE = auto() + LOOP_AND_TAIL = auto() + +class Status(Enum): + ABANDONED = auto() + EMPTY = auto() + ACTIVE = auto() + +# ========================================== +# 2. Handler Registry (The Method Tape) +# ========================================== + +def handler_hard_error(tm ,*args): + """The default 'Mean' behavior: Hard Stop.""" + raise RuntimeError(f"Operation not permitted on {type(tm).__name__}") + +def handler_ignore(tm ,*args): + """The 'Black Hole' behavior: Do nothing.""" + return None + +def handler_write_list(tm ,val): + """Standard Write for List backing.""" + # Entanglement Accounting Check + if tm.tape_ref.count > 1: + raise RuntimeError("Cannot Write: Tape is Entangled (Shared)") + tm.tape_ref.data[tm.head] = val + +def handler_delete_right_neighbor_list(tm): + """Delete right neighbor in a list.""" + if tm.tape_ref.count > 1: + raise RuntimeError("Cannot Delete: Tape is Entangled (Shared)") + + # Check neighbor existence + if tm.head + 1 < len(tm.tape_ref.data): + del tm.tape_ref.data[tm.head + 1] + +def handler_write_map(tm ,val): + """Standard Write for Map backing (updates value for current key).""" + if tm.tape_ref.count > 1: + raise RuntimeError("Cannot Write: Tape is Entangled") + + key = tm.address() + tm.tape_ref.data[key] = val + +# ========================================== +# 3. Tape Reference (Entanglement Accounting) +# ========================================== + +class Tape: + """ + Holds the actual data and the entanglement reference count. + """ + def __init__(self ,data): + self.data = data + self.count = 1 # Start with 1 owner + + # For Maps, we might need a stable key view for stepping + self.keys_view = None + if isinstance(data ,dict): + self.keys_view = list(data.keys()) + + def checkout(self): + self.count += 1 + + def checkin(self): + if( self.count > 0 ): + self.count -= 1 + +# ========================================== +# 4. The Machines +# ========================================== + +class TM_Interface(abc.ABC): + @abc.abstractmethod + def r(self): pass + @abc.abstractmethod + def w(self ,v): pass + @abc.abstractmethod + def s(self ,n=1): pass + @abc.abstractmethod + def d(self): pass + @abc.abstractmethod + def e(self): pass + @abc.abstractmethod + def rightmost(self): pass + @abc.abstractmethod + def address(self): pass + @abc.abstractmethod + def dismount(self): pass + +class TM(TM_Interface): + """ + The General Entanglement Accounting Machine. + Specialized for LIST (Sequence) tapes. + """ + def __init__(self ,data): + # If data is already a Tape object (from entanglement), use it. + # Otherwise create new. + if( isinstance(data ,Tape) ): + self.tape_ref = data + else: + self.tape_ref = Tape(data) + + self.head = 0 + + # Registered Handlers + self._write_handler = handler_write_list + self._delete_handler = handler_delete_right_neighbor_list + + def r(self): + return self.tape_ref.data[self.head] + + def w(self ,v): + return self._write_handler(self ,v) + + def d(self): + return self._delete_handler(self) + + def s(self ,n=1): + self.head += n + return self + + def e(self): + # Entangle: Checkout the tape + self.tape_ref.checkout() + + # Create new machine sharing the Tape ref + new_tm = TM(self.tape_ref) + new_tm.head = self.head + + # Copy handlers + new_tm._write_handler = self._write_handler + new_tm._delete_handler = self._delete_handler + return new_tm + + def dismount(self): + self.tape_ref.checkin() + + def rightmost(self): + return self.head >= len(self.tape_ref.data) - 1 + + def address(self): + """For List, address is the integer index.""" + return self.head + + def topology(self): + return Topology.SEGMENT + + +class TM_Map(TM): + """ + Entanglement Accounting Machine specialized for MAPS. + """ + def __init__(self ,data): + super().__init__(data) + self._write_handler = handler_write_map + # Delete on map is complex (removing key affects order), + # disabling default delete for now or need custom handler. + self._delete_handler = handler_hard_error + + def r(self): + # Read Value + key = self.tape_ref.keys_view[self.head] + return self.tape_ref.data[key] + + def address(self): + # Address is the Key + return self.tape_ref.keys_view[self.head] + + def rightmost(self): + return self.head >= len(self.tape_ref.keys_view) - 1 + + def e(self): + self.tape_ref.checkout() + new_tm = TM_Map(self.tape_ref) + new_tm.head = self.head + return new_tm + + +class TM_Null(TM_Interface): + """ + The Mean Machine (No Tape). + """ + def __init__(self): + pass + + def r(self): handler_hard_error(self) + def w(self ,v): handler_hard_error(self) + def s(self ,n=1): handler_hard_error(self) + def d(self): handler_hard_error(self) + def e(self): handler_hard_error(self) + def address(self): handler_hard_error(self) + def dismount(self): pass # No tape to checkin + + def rightmost(self): handler_hard_error(self) + def topology(self): return Topology.NULL + def status(self): return Status.EMPTY + + +class TM2(TM_Interface): + """ + Second Order (Status) Machine. + Wraps a First Order Machine. + """ + def __init__(self ,data=None ,tm_class=TM): + if( data and len(data) > 0 ): + self.tm = tm_class(data) + self._stat = Status.ACTIVE + else: + self.tm = TM_Null() + self._stat = Status.EMPTY + + # Store class for entanglement factories + self._tm_class = tm_class + + def empty(self): + return self._stat == Status.EMPTY + + # --- Delegation --- + + def r(self): return self.tm.r() + def w(self ,v): return self.tm.w(v) + def s(self ,n=1): return self.tm.s(n) + def d(self): return self.tm.d() + def address(self): return self.tm.address() + def dismount(self): self.tm.dismount() + + def e(self): + if( self.empty() ): return TM2(None) + + # Create new wrapper around entangled inner + new_wrapper = TM2.__new__(TM2) + new_wrapper.tm = self.tm.e() + new_wrapper._stat = Status.ACTIVE + new_wrapper._tm_class = self._tm_class + return new_wrapper + + def rightmost(self): + if( self.empty() ): return True + return self.tm.rightmost() + +# ========================================== +# 5. CLI Verification +# ========================================== + +def CLI(): + print("--- TTCA Tape Machine Verification ---") + + # 1. List Machine (TM) + print("\n[Test 1] List Machine (EA)") + data = ['A' ,'B' ,'C'] + tm2 = TM2(data ,TM) + + if( not tm2.empty() ): + # Address check + print(f"Addr {tm2.address()}: {tm2.r()}") + + # Entanglement + tm2_copy = tm2.e() + tm2.s() + print(f"Original Moved -> Addr {tm2.address()}: {tm2.r()}") + print(f"Copy Stayed -> Addr {tm2_copy.address()}: {tm2_copy.r()}") + + # Destructive Check (Should Fail due to 2 owners) + try: + tm2.d() + except RuntimeError as e: + print(f"Caught Expected EA Error: {e}") + + # Dismount copy to allow delete + tm2_copy.dismount() + print("Copy dismounted.") + + # Now delete should work (deletes right neighbor 'C') + tm2.d() + print(f"After Delete: {data}") + + # 2. Map Machine (TM_Map) + print("\n[Test 2] Map Machine") + map_data = {'x': 10 ,'y': 20 ,'z': 30} + tm_map = TM2(map_data ,TM_Map) + + if( not tm_map.empty() ): + print(f"Key {tm_map.address()}: Val {tm_map.r()}") + tm_map.s() + print(f"Key {tm_map.address()}: Val {tm_map.r()}") + +if __name__ == "__main__": + CLI() diff --git a/developer/deprecated/ObjectRegistry.py b/developer/deprecated/ObjectRegistry.py new file mode 100644 index 0000000..9d30fd9 --- /dev/null +++ b/developer/deprecated/ObjectRegistry.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- + +""" +ObjectRegistry + +Maps Python runtime objects to ProcessLocalId. + +Strategies: + 1. Weak Identity (Preferred): For objects that support weakrefs (instances ,classes). + - ID lifetime is bound to object lifetime. + - Auto-cleaned on GC. + + 2. Value Identity (Fallback): For immutable primitives (int ,str ,tuple). + - ID is bound to the *value* (hash/equality). + - Stored strongly (since values like 42 or "red" are conceptually eternal). +""" + +from __future__ import annotations + +import weakref +from typing import Any ,Callable ,Dict ,Optional + +from .ProcessLocalId import ProcessLocalIdGenerator ,ProcessLocalId + + +class ObjectRegistry: + def __init__(self ,id_gen: ProcessLocalIdGenerator): + self._id_gen = id_gen + + # Strategy 1: Entities (Weakref-able) + self._obj_to_id_wkd: "weakref.WeakKeyDictionary[Any ,ProcessLocalId]" = weakref.WeakKeyDictionary() + self._id_to_obj_ref: Dict[ProcessLocalId ,weakref.ref] = {} + + # Strategy 2: Values (Hashable ,not weakref-able) + self._value_to_id: Dict[Any ,ProcessLocalId] = {} + + self._finalizers: Dict[ProcessLocalId ,Callable[[ProcessLocalId] ,None]] = {} + self._global_finalizer: Optional[Callable[[ProcessLocalId] ,None]] = None + + def register_finalizer(self ,fn: Callable[[ProcessLocalId] ,None]): + """ + Registers a finalizer callback invoked when any registered *weak-refable* object is GC'd. + """ + self._global_finalizer = fn + + def _on_collect(self ,obj_id: ProcessLocalId): + # Only called for Strategy 1 objects + ref = self._id_to_obj_ref.pop(obj_id ,None) + if(ref is not None): + # The WeakKeyDictionary auto-cleans the forward mapping , + # but we double check or clean any edge cases if needed. + pass + + fn = self._global_finalizer + if(fn is not None): + fn(obj_id) + + def get_id(self ,obj: Any) -> ProcessLocalId: + """ + Returns the ProcessLocalId for `obj` ,registering it if needed. + """ + # 1. Try WeakRef Strategy (Entities) + try: + existing = self._obj_to_id_wkd.get(obj) + if(existing is not None): + return existing + except TypeError: + # obj is not weakref-able (e.g. int ,str ,tuple ,or list). + # Fall through to Strategy 2. + pass + else: + # It IS weakref-able ,but wasn't in the dictionary. Register it. + obj_id = self._id_gen.next_id() + self._obj_to_id_wkd[obj] = obj_id + # Create reverse lookup with callback + self._id_to_obj_ref[obj_id] = weakref.ref(obj ,lambda _ref ,oid=obj_id: self._on_collect(oid)) + return obj_id + + # 2. Try Value Strategy (Primitives) + # Note: Mutable non-weakrefables (like standard lists) will fail here because they are unhashable. + try: + existing = self._value_to_id.get(obj) + if(existing is not None): + return existing + + # Register new value + obj_id = self._id_gen.next_id() + self._value_to_id[obj] = obj_id + return obj_id + except TypeError: + # It is neither weakref-able NOR hashable (e.g. standard list ,dict). + raise TypeError( + f"ObjectRegistry: cannot track object of type {type(obj)!r}. " + "It is neither weakref-able (Entity) nor hashable (Value)." + ) + + def try_get_object(self ,obj_id: ProcessLocalId) -> Optional[Any]: + """ + Best-effort: returns the live object/value. + """ + # Check Entities + ref = self._id_to_obj_ref.get(obj_id) + if(ref is not None): + return ref() + + # Check Values + # For now ,we assume values identify themselves. + for val ,pid in self._value_to_id.items(): + if(pid == obj_id): + return val + + return None diff --git a/developer/deprecated/ProcessLocalId.py b/developer/deprecated/ProcessLocalId.py new file mode 100644 index 0000000..b57c47c --- /dev/null +++ b/developer/deprecated/ProcessLocalId.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 +# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- + + +""" +ProcessLocalId + +A process-local identifier used as an internal key. + +Design constraint: + - NOT intended to be serialized or persisted. + - `repr()` intentionally does not reveal the numeric token, to discourage logging/persistence. +""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True ,slots=True) +class ProcessLocalId: + _n: int + + def __repr__(self) -> str: + return "" + + def __str__(self) -> str: + return "" + + def as_int_UNSAFE(self) -> int: + """ + Returns the raw integer token. + + UNSAFE because: + - tokens are process-local + - do not write these into files/databases/logs as stable identifiers + """ + return self._n + + +class ProcessLocalIdGenerator: + """ + Monotonic generator; ids are never recycled. + """ + def __init__(self ,start: int = 1): + if start < 1: raise ValueError("start must be >= 1") + self._next_n: int = start + + def next_id(self) -> ProcessLocalId: + n = self._next_n + self._next_n += 1 + return ProcessLocalId(n) diff --git a/developer/deprecated/Property.py b/developer/deprecated/Property.py new file mode 100644 index 0000000..5b33226 --- /dev/null +++ b/developer/deprecated/Property.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- + + +""" +Property + +A Property is itself an entity (it has an Identity id) so that: + - properties can have properties + - properties can be members of semantic sets +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional ,Tuple + +from .ProcessLocalId import ProcessLocalId + + +@dataclass(frozen=True ,slots=True) +class Property: + id: ProcessLocalId + name_path: Tuple[str ,...] + doc: str = "" + + def __repr__(self) -> str: + # name_path is safe to reveal; id token is not. + return f"" diff --git a/developer/deprecated/PropertyManager.py b/developer/deprecated/PropertyManager.py new file mode 100644 index 0000000..36e2b9e --- /dev/null +++ b/developer/deprecated/PropertyManager.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- + + +""" +PropertyManager + +Core RT property system. + +Key decisions vs the earlier `property_manager.py`: + - do NOT key by `object_path()` strings (avoids collisions) fileciteturn3file4 + - runtime objects are keyed by weak identity (ProcessLocalId assigned by ObjectRegistry) + - properties are first-class entities (Property has an id), so properties can have properties + +This remains process-local and in-memory. +""" + +from __future__ import annotations + +from typing import Any ,Dict ,Iterable ,List ,Optional ,Tuple ,Union + +from .ProcessLocalId import ProcessLocalIdGenerator ,ProcessLocalId +from .ObjectRegistry import ObjectRegistry +from .PropertyStore import PropertyStore +from .Property import Property +from .SemanticSets import SemanticSet ,SemanticSetStore +from .Syntax import SyntaxInstance + + +NamePathLike = Union[str ,List[str] ,Tuple[str ,...]] + + +class PropertyManager: + def __init__(self): + self._id_gen = ProcessLocalIdGenerator() + self._obj_reg = ObjectRegistry(self._id_gen) + self._store = PropertyStore() + self._sets = SemanticSetStore() + + # Declare-by-name registry + self._name_path_to_property: Dict[Tuple[str ,...],Property] = {} + self._property_id_to_property: Dict[ProcessLocalId ,Property] = {} + + self._name_path_to_set: Dict[Tuple[str ,...],SemanticSet] = {} + self._set_id_to_set: Dict[ProcessLocalId ,SemanticSet] = {} + + # Optional syntax instances (if user chooses to model them) + self._syntax_id_to_instance: Dict[ProcessLocalId ,SyntaxInstance] = {} + + # Finalization cleanup + self._obj_reg.register_finalizer(self._on_subject_finalized) + + def _on_subject_finalized(self ,subject_id: ProcessLocalId): + self._store.remove_subject(subject_id) + self._sets.remove_subject(subject_id) + + def _normalize_name_path(self ,name_path: NamePathLike) -> Tuple[str ,...]: + if isinstance(name_path ,tuple): return name_path + if isinstance(name_path ,list): return tuple(name_path) + if isinstance(name_path ,str): return tuple(name_path.split(".")) + raise TypeError("name_path must be str ,list[str] ,or tuple[str ,...]") + + # ------------------------- + # Identity acquisition + # ------------------------- + def id_of_py_object(self ,obj: Any) -> ProcessLocalId: + return self._obj_reg.get_id(obj) + + def create_syntax_identity(self ,syntax: SyntaxInstance) -> ProcessLocalId: + sid = self._id_gen.next_id() + self._syntax_id_to_instance[sid] = syntax + return sid + + def try_get_syntax(self ,syntax_id: ProcessLocalId) -> Optional[SyntaxInstance]: + return self._syntax_id_to_instance.get(syntax_id) + + # ------------------------- + # Property declaration + # ------------------------- + def declare_property(self ,name_path: NamePathLike ,doc: str = "") -> ProcessLocalId: + np = self._normalize_name_path(name_path) + existing = self._name_path_to_property.get(np) + if existing is not None: return existing.id + pid = self._id_gen.next_id() + p = Property(pid ,np ,doc) + self._name_path_to_property[np] = p + self._property_id_to_property[pid] = p + return pid + + def property_id(self ,name_path: NamePathLike) -> ProcessLocalId: + np = self._normalize_name_path(name_path) + p = self._name_path_to_property.get(np) + if p is None: raise KeyError(f"Property not declared: {np!r}") + return p.id + + def try_get_property(self ,prop_id: ProcessLocalId) -> Optional[Property]: + return self._property_id_to_property.get(prop_id) + + # ------------------------- + # Semantic sets + # ------------------------- + def declare_set(self ,name_path: NamePathLike ,doc: str = "") -> ProcessLocalId: + np = self._normalize_name_path(name_path) + existing = self._name_path_to_set.get(np) + if existing is not None: return existing.id + sid = self._id_gen.next_id() + s = SemanticSet(sid ,np ,doc) + self._name_path_to_set[np] = s + self._set_id_to_set[sid] = s + return sid + + def add_to_set(self ,subject: Any ,set_id: ProcessLocalId): + subject_id = self._coerce_subject_id(subject) + self._sets.add_member(set_id ,subject_id) + + def is_in_set(self ,subject: Any ,set_id: ProcessLocalId) -> bool: + subject_id = self._coerce_subject_id(subject) + return self._sets.has_member(set_id ,subject_id) + + def members(self ,set_id: ProcessLocalId) -> List[ProcessLocalId]: + return list(self._sets.members(set_id)) + + # ------------------------- + # Set/get properties + # ------------------------- + def set(self ,subject: Any ,prop: Union[ProcessLocalId ,NamePathLike] ,value: Any): + subject_id = self._coerce_subject_id(subject) + prop_id = self._coerce_property_id(prop) + self._store.set(subject_id ,prop_id ,value) + + def get(self ,subject: Any ,prop: Union[ProcessLocalId ,NamePathLike] ,default: Any = None) -> Any: + subject_id = self._coerce_subject_id(subject) + prop_id = self._coerce_property_id(prop) + return self._store.get(subject_id ,prop_id ,default) + + def has(self ,subject: Any ,prop: Union[ProcessLocalId ,NamePathLike]) -> bool: + subject_id = self._coerce_subject_id(subject) + prop_id = self._coerce_property_id(prop) + return self._store.has(subject_id ,prop_id) + + def subjects_with(self ,prop: Union[ProcessLocalId ,NamePathLike]) -> List[ProcessLocalId]: + prop_id = self._coerce_property_id(prop) + return list(self._store.subjects_with(prop_id)) + + # ------------------------- + # Coercions + # ------------------------- + def _coerce_subject_id(self ,subject: Any) -> ProcessLocalId: + if isinstance(subject ,ProcessLocalId): return subject + # For Python runtime objects, we require weakref-able instances. + return self._obj_reg.get_id(subject) + + def _coerce_property_id(self ,prop: Union[ProcessLocalId ,NamePathLike]) -> ProcessLocalId: + if isinstance(prop ,ProcessLocalId): return prop + return self.property_id(prop) diff --git a/developer/deprecated/PropertyStore.py b/developer/deprecated/PropertyStore.py new file mode 100644 index 0000000..bc8f3f3 --- /dev/null +++ b/developer/deprecated/PropertyStore.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 +# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- + + +""" +PropertyStore + +Stores property values and maintains reverse lookups. + +This is intentionally process-local and in-memory. +""" + +from __future__ import annotations + +from typing import Any ,Dict ,Optional ,Set ,Tuple + +from .ProcessLocalId import ProcessLocalId + + +class PropertyStore: + def __init__(self): + # (subject_id ,property_id) -> value + self._values: Dict[Tuple[ProcessLocalId ,ProcessLocalId] ,Any] = {} + + # subject_id -> set(property_id) + self._subject_to_props: Dict[ProcessLocalId ,Set[ProcessLocalId]] = {} + + # property_id -> set(subject_id) + self._prop_to_subjects: Dict[ProcessLocalId ,Set[ProcessLocalId]] = {} + + def set(self ,subject_id: ProcessLocalId ,prop_id: ProcessLocalId ,value: Any): + key = (subject_id ,prop_id) + self._values[key] = value + self._subject_to_props.setdefault(subject_id ,set()).add(prop_id) + self._prop_to_subjects.setdefault(prop_id ,set()).add(subject_id) + + def get(self ,subject_id: ProcessLocalId ,prop_id: ProcessLocalId ,default: Any = None) -> Any: + return self._values.get((subject_id ,prop_id) ,default) + + def has(self ,subject_id: ProcessLocalId ,prop_id: ProcessLocalId) -> bool: + return (subject_id ,prop_id) in self._values + + def subjects_with(self ,prop_id: ProcessLocalId) -> Set[ProcessLocalId]: + return set(self._prop_to_subjects.get(prop_id ,set())) + + def props_of(self ,subject_id: ProcessLocalId) -> Set[ProcessLocalId]: + return set(self._subject_to_props.get(subject_id ,set())) + + def remove_subject(self ,subject_id: ProcessLocalId): + """ + Remove all stored properties for a subject (used on finalization). + """ + prop_ids = self._subject_to_props.pop(subject_id ,set()) + for prop_id in prop_ids: + self._values.pop((subject_id ,prop_id) ,None) + s = self._prop_to_subjects.get(prop_id) + if s is not None: + s.discard(subject_id) + if not s: self._prop_to_subjects.pop(prop_id ,None) diff --git a/developer/deprecated/SemanticSets.py b/developer/deprecated/SemanticSets.py new file mode 100644 index 0000000..f0fdf48 --- /dev/null +++ b/developer/deprecated/SemanticSets.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- + + +""" +SemanticSets + +Membership sets over identities. Used for semantic typing. + +Design: + - set_id identifies the set + - members are subject ids + - reverse index for cleanup +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict ,Optional ,Set + +from .ProcessLocalId import ProcessLocalId + + +@dataclass(frozen=True ,slots=True) +class SemanticSet: + id: ProcessLocalId + name_path: tuple[str ,...] + doc: str = "" + + def __repr__(self) -> str: + return f"" + + +class SemanticSetStore: + def __init__(self): + self._members: Dict[ProcessLocalId ,Set[ProcessLocalId]] = {} + self._subject_to_sets: Dict[ProcessLocalId ,Set[ProcessLocalId]] = {} + + def add_member(self ,set_id: ProcessLocalId ,subject_id: ProcessLocalId): + self._members.setdefault(set_id ,set()).add(subject_id) + self._subject_to_sets.setdefault(subject_id ,set()).add(set_id) + + def has_member(self ,set_id: ProcessLocalId ,subject_id: ProcessLocalId) -> bool: + return subject_id in self._members.get(set_id ,set()) + + def members(self ,set_id: ProcessLocalId) -> Set[ProcessLocalId]: + return set(self._members.get(set_id ,set())) + + def remove_subject(self ,subject_id: ProcessLocalId): + set_ids = self._subject_to_sets.pop(subject_id ,set()) + for set_id in set_ids: + m = self._members.get(set_id) + if m is not None: + m.discard(subject_id) + if not m: self._members.pop(set_id ,None) diff --git a/developer/deprecated/Syntax.py b/developer/deprecated/Syntax.py new file mode 100644 index 0000000..4fcfa7c --- /dev/null +++ b/developer/deprecated/Syntax.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 +# -*- mode: python; coding: utf-8; python-indent-offset: 2 -*- + + +""" +Syntax + +RT syntax identity instances. + +We treat "syntax" as AST-level objects: + - kind: official-ish AST node kind name (e.g., "ast.FunctionDef") + - location: file + span + - scope: enclosing syntax identity id (optional) + - parts: mapping of part-name to literal or referenced syntax identity id(s) + +This module does NOT traverse Python programs. It only defines the data model. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any ,Dict ,Optional ,Tuple ,Union ,List + +from .ProcessLocalId import ProcessLocalId + + +@dataclass(frozen=True ,slots=True) +class SourceSpan: + file_path: str + lineno: int + col: int + end_lineno: int + end_col: int + + +SyntaxPartValue = Union[ + None + ,bool + ,int + ,float + ,str + ,ProcessLocalId + ,List["SyntaxPartValue"] + ,Dict[str ,"SyntaxPartValue"] +] + + +@dataclass(frozen=True ,slots=True) +class SyntaxInstance: + """ + A single syntax node instance. + + NOTE: many syntax nodes have no identifier-name. Name-like things (identifiers) + appear as child nodes or literals inside `parts`. + """ + kind: str + span: SourceSpan + scope_id: Optional[ProcessLocalId] = None + parts: Dict[str ,SyntaxPartValue] = None