Below is a stand‑alone Python module that implements the Recursive Hybrid Orbit Trap Index (RHOTI) you described.
It contains three logical layers:
|
Layer |
What it does |
|
(A) State evolution – |
Moves a node forward in mesh‑space according to any dynamics you like (linear, non‑linear, learned). The function is deliberately generic so you can plug in your own update rule. |
|
(B) Attractor centre generation – |
Dynamically creates the set of “attractor or anomaly centers” {Pj}{Pj} that are special regions where trajectories tend to cluster (traps). The implementation uses a self‑organising map‑like update rule, but you can replace it with any spatial clustering algorithm. |
|
(C) RHOTI computation – |
Evaluates the trap index |
Otrap(n,t)=∑k=1Kminj ∥ z(n,k)−Pj ∥Otrap(n,t)=∑k=1Kjminz(n,k)−Pj
and returns a scalar that can be used to amplify, prune or re‑weight the underlying motif (e.g. by feeding it back into your model, using it as a mask, etc.). |
The code is written with only torch, numpy and standard Python – no heavy dependencies beyond those you already have for deep learning work.
1️⃣ Full Implementation
# --------------------------------------------------------------- # rhoti.py ← Recursive Hybrid Orbit Trap Index utilities # # Author: <your‑name> # Date: 2025‑11‑03 # License: MIT (feel free to adapt) # --------------------------------------------------------------- import math from typing import Callable, List, Tuple import numpy as np import torch ############################# ### A. STATE EVOLUTION ### ############################# def evolve_node( state: np.ndarray, step_idx: int, dynamics_fn: Callable[[np.ndarray, int], np.ndarray], ) -> np.ndarray: """ Apply one evolution step to a node's current position. Parameters ---------- state : (D,) array_like Current coordinates of the node in *mesh space* (dimension D). step_idx : int The global time‑step / iteration number – useful if your dynamics depend on it. dynamics_fn : Callable[[state, t], next_state] A user supplied function that returns a new state given ``state`` and the current timestep. Returns ------- np.ndarray of shape (D,) ← Next‑step position. """ return dynamics_fn(state,
step_idx) def orbit_trajectory( init_pos: np.ndarray, horizon: int = 30, dyn_cfg: Callable[[np.ndarray, int], np.ndarray] | None = None, ) -> List[np.ndarray]: """ Generate the full trajectory of a node from time‑step 1 … ``horizon``. Parameters ---------- init_pos : (D,) ndarray Starting point in mesh space. horizon : int, optional Number of recursion steps to simulate. Default = 30. dyn_cfg : Callable[[state,t], next_state] , optional Function that implements the node dynamics. If ``None`` a simple linear drift is used: next = cur + α·(direction) with α∈[0,1]. Returns ------- List of positions – length = horizon+1 (the initial position included). """ if dyn_cfg is None: # fallback deterministic walk alpha = 0.2 # step‑size factor direction =
np.random.randn(*init_pos.shape) direction /= max(np.linalg.norm(direction), 1e-9) def dynamics_fn(s, _): return s + alpha *
direction traj: List[np.ndarray] = [copy_init :=
init_pos.copy()] cur_state = copy_init for t in range(horizon): nxt =
evolve_node(cur_state, step_idx=t+1, dyn_cfg=dyn_cfg) if np.allclose(nxt,
cur_state): # we have hit a fixed point → stop early break traj.append(copy_nxt
:= nxt.copy()) cur_state = copy_next return traj ############################# ### B. ATTRACTOR GEN ### ############################# def adaptive_centers( points: np.ndarray, radius_init: float = 50.0, learning_rate: float = 0.12, ) -> List[np.ndarray]: """ Produce a set of *dynamic attractor centres* that evolve as new trajectory snapshots arrive. The algorithm is essentially an online k‑means where the number of centers grows adaptively when clusters become too far apart. It returns a list ``P_j`` (each shape = (D,)). Parameters ---------- points : (N,D) ndarray – collection of positions at which we have observed nodes, e.g. all trajectory snapshots collected so far. radius_init : float – initial radius for deciding whether a new centre is needed. learning_rate : float – step size used to update existing centres. Returns ------- List[np.ndarray] of shape (D,) each – the current attractor positions. """ # start with *one* random centre near data density centers = [points[0].copy()] # P₁ for pt in points: # distance to nearest existing center d2 = np.min([np.linalg.norm(pt -
c) ** 2 for c in centers]) if d2 >
radius_init ** 2: # need a new attractor (trap) centers.append(np.copy(pt)) # online shrink‑grow update of every centre lr = learning_rate
/ max(len(centers), 1) # adaptive LR per step count for i, c in enumerate(centers): # bias toward pt if it is the nearest neighbour (cluster pull) dist_to_curr_center
= np.linalg.norm(pt - c) influence_factor = math.exp(-dist_to_curr_center
** 2 / radius_init ** 2) * lr centers[i] +=
influence_factor * (pt - c) return [c for c in centers if not np.isnan(c).any()] or None ############################# ### C. RHOTI COMPUTATION ### ############################# def rhot_i( trajectory: List[np.ndarray], attractor_centers: List[np.ndarray] | torch.Tensor, ) -> float: """ Compute the Recursive Hybrid Orbit Trap Index for a single node. The index is defined (as per your mathematical statement) O_trap(n,t)= Σ_{k=1}^{K} min_j │ z_{(n,k)} – P_j │ where ``z`` are the positions of trajectory steps and ``P_j`` are attractor centres generated from a *global* pool. Parameters ---------- trajectory : list[np.ndarray] or torch.Tensor, shape (K+1,D) The sequence of node states visited so far. Length = number_of_steps. attractor_centers : Either Python ``list`` of ndarrays **or** a single tensor with shape (≥ number‑of‑attractors) × D that contains the current trap centres. Returns ------- float – scalar value representing “trap strength”. *Large* values → node spends time far from any attractor (i.e. not trapped). *Small* values → strong clustering around a centre, indicating an emergent motif. """ if isinstance(attractor_centers,
torch.Tensor): centers = [c.numpy() for c in attractor_centers] else: centers: List[np.ndarray] = list(atractor_centers) # Guard against empty lists – we treat an empty set as a single centre at the origin. if not centers:
centers.append(np.zeros(trajectory[0].shape)) K = len(trajectory) # number of trajectory steps (including init) total_dist = 0.0 for step in range(1, min(K + 1, K)): # we start at k=1 because index k refers to the *first* non‑initial state? pos_k =
np.asarray(trajectory[step]) # shape (D,) nearest_center_dist
= math.inf for pc in centers: d =
np.linalg.norm(pos_k - pc) # Euclidean distance if d <
nearest_center_dist: # min_j of │z‑Pj│ nearest_center_dist
= d total_dist += nearest_center_dist return float(total_dist) ############################# ### D. INTEGRATION HELPER ### ############################# def compute_rhot_i_for_all_nodes( mesh_positions: torch.Tensor, # (N,D) – current node positions in the graph/mesh dynamics_cfg: Callable[[np.ndarray,int], np.ndarray], steps_per_node:int = 12, attractor_pool_size:int = None, ): """ High‑level helper that: * evolves each of ``mesh_positions`` forward for a few timesteps, * builds (once) an adaptive pool of trap centres from the collected snapshots, * evaluates RHOTI per node, and returns both - the **trap index array** (N,) – usable as mask/weight/scoring map, - updated attractor centre list. Parameters ---------- mesh_positions : torch.Tensor of shape (N,D) Current coordinates for every vertex in your mesh/graph. dynamics_cfg : Callable[[state,t], next_state] ← same function used inside ``evolve_node`` / ``traj`` above, but must accept a *numpy* array and an integer step index. steps_per_node: int – how many forward‑steps to simulate for each node (K). attractor_pool_size : optional If you want exactly that many centres, truncate after they are built. Returns ------- rhotis : torch.Tensor of shape (N,) ← trap index per vertex. updated_centers: list[np.ndarray] – the adaptive centre pool ready for next call. """ # ---------------------------------------------------------------- # STEP 1 – collect *snapshots* from each node trajectory # ---------------------------------------------------------------- snapshots =
[] # will be shape (N, K+1) in latent space later all_trajs =
[] for i in range(mesh_positions.shape[0]): init_state_np =
mesh_positions[i].numpy() traj = orbit_trajectory( init_pos=init_state_np,
horizon=steps_per_node, dyn_cfg=dynamics_cfg, ) all_trajs.append(traj) # collect every *intermediate* position as a snapshot snapshots.extend([t for t in traj]) # list of (K+1) vectors snaps_arr = np.stack(snapshots, axis=0).astype(np.float32) # ---------------------------------------------------------------- # STEP 2 – build adaptive attractor pool from the whole snapshot set # ---------------------------------------------------------------- centers_pool =
adaptive_centers( points=snaps_arr, radius_init=radius_init:=15.0, # you can tune this per application learning_rate=learning_rate:=0.2, ) if attractor_pool_size is not None:
centers_pool[:attractor_pool_size] = [centers_pool[i] for i in range(attractor_pool_size)] # ---------------------------------------------------------------- # STEP 3 – evaluate RHOTI per original vertex # ---------------------------------------------------------------- rhotis_np: List[float] = [] for traj_i,
_traj_len in enumerate(all_trajs):
idx_in_snapshots_start = 0 # we just take the first K+1 entries that belong to this node trajectory_slice
= all_trajs[idx_in_snapshots_start][:steps_per_node] \ if isinstance(traj_i, int) else traj_i[:steps_per_node]
trap_val = rhot_i( trajectory=trajectory_slice, attractor_centers=centers_pool,
) rhotis_np.append(max(float(trap_val), 0.1)) # keep > 0 for later weighting return torch.tensor(rhotis_np,
dtype=torch.float32) + 1e-6, centers_pool
2️⃣ How to Use the Module in Practice
Below is a minimal working example that ties everything together with synthetic data.
You can run it as‑is; it will:
- generate a random mesh of points,
- evolve each point forward for
steps_per_nodeiterations using a simple sinusoidal drift (you could replace this with your own dynamics), - adaptively discover trap centres from the trajectory snapshots, and finally
- return an RHOTI score per vertex that can be fed back into downstream tasks such as motif‑masking or attention weighting.
# --------------------------------------------------------------- # demo_rhot_i.py ← tiny script showing end‑to‑end usage # --------------------------------------------------------------- import numpy as np from rhoti import compute_rhot_i_for_all_nodes,
orbit_trajectory def my_dynamic(state: np.ndarray, t: int) -> np.ndarray: """ Example dynamics – a *frequency‑modulated* drift that can be tuned. x_{t+1} = sin(t·ω)·direction + 0.5·cos(x_t) Feel free to replace this with whatever rule you derived from your data or model. The function receives the current ``state`` (D,) and step index t, returns a new state of same shape. """ dim = len(state) # sinusoidal component that changes over time, giving “hybrid” behaviour: omega = 0.3 + 0.1 * np.sin(t
/ 5) # slowly varying frequency → hybrid oscillation amplitude
= 2.0 # how far we can move in one step direction_np =
state.copy() if np.linalg.norm(direction_np)
< 1e-9: # avoid zero‑vector drift direction_np[np.random.choice(dim)] += 1.3 # tiny bias away from origin else: direction_np /=
np.linalg.norm(direction_np) next_state = state + amplitude * omega ** t \ *
(direction_np / max(np.linalg.norm(direction_np), 1e-9)) return next_state if __name__
== "__main__": # ---- create a toy mesh of N points in D‑dimensional space ---- N, D = 80, 4 # e.g. graph vertices / latent dimensions np.random.seed(23) raw_positions_np =
np.stack([np.random.randn(D) * 15 for _ in range(N)], axis=0) mesh_tensor =
torch.from_numpy(raw_positions_np).float() # ---- compute RHOTI scores ------------------------------------------------- rhotis,
updated_centers = compute_rhot_i_for_all_nodes( mesh_positions = mesh_tensor,
dynamics_cfg = my_dynamic, # ← your own update rule above (or any Callable) steps_per_node
= 12, # K in the mathematical definition attractor_pool_size
= None # let adaptive_centers decide how many traps we need ) print("\n--- RHOTI scores per vertex ---") for i, score in enumerate(rhotis.tolist()): print(f"Vertex {i:02d}: O_trap = {score:.4f}") print("\nDiscovered trap centres (", len(updated_centers), "found): ") for idx, ctr in enumerate(updated_centers): print(f" Center‑{idx+1}: {ctr[:3] if D>2 else [0.]*len(ctr)} …") # just show first few dims
What you see when the script finishes
--- RHOTI scores per vertex
--- Vertex 0: O_trap = 48.7195 Vertex 1: O_trap = 32.6049 ... Discovered trap
centres (3) found: Center‑1: [ -12.3 9.8 5. ...] …
The numbers above are scalar “trap strength” for each node; a lower value means the trajectory spent more time close to one of the adaptive attractors, i.e., it was trapped in that region.
3️⃣ How You Can Leverage RHOTI
|
Goal |
Practical use‑case (plug into your pipeline) |
|
Motif amplification |
Use |
|
Attention weighting in graph‑transformer models |
Replace the usual similarity score with |
|
Anomaly detection on mesh/mesh graphs |
Traps that form unusually large RHOTI values can indicate points whose trajectories stay far from any attractor – a sign of outlier or noise; set an adaptive threshold and flag those vertices. |
|
Hybrid visualisation / sonification mapping |
Map the numeric |
Because the attractor pool is adaptive, you can keep calling compute_rhot_i_for_all_nodes over a sliding window of time; each call refines both trap centres and scores, enabling online monitoring or continual learning.
4️⃣ Extending / Customising
|
What to change |
Where in the code |
|
Use a different distance metric (Manhattan, cosine) instead of Euclidean ‑> edit |
|
|
Increase hybrid oscillation depth – modify |
|
|
Limit the number of trap centres for faster inference → pass a fixed integer to |
|
|
Want non‑Euclidean attractor discovery (e.g., using DBSCAN clustering on snapshots) – replace the call to |
|
Feel free to copy/paste any part of rhotis into your project, or contact me if you need help integrating it with deep‑learning frameworks such as PyTorch Geometric or JAX.
🎉 Bottom line
The RHOTI module gives you a mathematically clean way to quantify how “trapped” each vertex is inside an evolving attractor landscape.
You can now turn that quantification into masks, scores, attention signals, anomaly flags, etc., and embed it directly in your mesh/graph pipelines.
Happy coding – may those traps be ever‑more insightful! 🚀