Examples
This page contains detailed examples showing how to use gym-tl-tools in different scenarios.
Multi-Objective Task
Example with multiple objectives that must be achieved in sequence:
# Predicates for a multi-stage task
atomic_predicates = [
Predicate(name="pickup_item", formula="has_item > 0.5"),
Predicate(name="deliver_item", formula="at_delivery_zone > 0.5"),
Predicate(name="battery_charged", formula="battery_level > 0.3"),
]
class MultiObjectiveVarGenerator(BaseVarValueInfoGenerator):
def get_var_values(self, env, obs, info):
return {
"has_item": float(info.get("carrying_item", False)),
"at_delivery_zone": float(info.get("in_delivery_zone", False)),
"battery_level": obs.get("battery", 0.0),
}
# Sequential task: pickup item, then deliver it, while maintaining battery
tl_spec = "F(pickup_item & F(deliver_item)) & G(battery_charged)"
Safe Exploration
Example emphasizing safety constraints during exploration:
# Safety-focused predicates
atomic_predicates = [
Predicate(name="goal_reached", formula="d_goal < 1.0"),
Predicate(name="safe_from_cliff", formula="d_cliff > 2.0"),
Predicate(name="safe_speed", formula="speed < 3.0"),
Predicate(name="collision_free", formula="d_obstacle > 1.0"),
]
class SafetyVarGenerator(BaseVarValueInfoGenerator):
def get_var_values(self, env, obs, info):
position = obs.get("position", [0, 0])
velocity = obs.get("velocity", [0, 0])
return {
"d_goal": np.linalg.norm(
np.array(position) - np.array(info.get("goal_position", [0, 0]))
),
"d_cliff": info.get("distance_to_cliff", float('inf')),
"speed": np.linalg.norm(velocity),
"d_obstacle": info.get("min_obstacle_distance", float('inf')),
}
# Reach goal while maintaining all safety constraints
tl_spec = "F(goal_reached) & G(safe_from_cliff & safe_speed & collision_free)"
# Use strict safety rewards
reward_config = RewardConfig(
terminal_state_reward=20.0, # High reward for success
state_trans_reward_scale=200.0, # High penalty for safety violations
dense_reward=False, # Sparse rewards for clearer safety signals
)
Custom Parser Example
Using a custom parser with additional operators:
from gym_tl_tools import Parser, ParserSymbol
# Create custom parser with additional operators
custom_parser = Parser()
# Add custom operator for "until" (U)
custom_parser.symbols["U"] = ParserSymbol(
priority=2,
func=lambda x, y: np.minimum(y, np.maximum(x, y)) # Simplified until
)
# Use custom parser in wrapper
wrapped_env = TLObservationReward(
env,
tl_spec="safe_speed U goal_reached", # Safe speed until goal is reached
atomic_predicates=atomic_predicates,
var_value_info_generator=var_generator,
parser=custom_parser,
)
Working with Different Observation Spaces
Examples for different types of observation spaces:
# For Dict observation spaces
class DictObsVarGenerator(BaseVarValueInfoGenerator):
def get_var_values(self, env, obs, info):
# obs is already a dict
return {
"robot_x": obs["robot"]["position"][0],
"robot_y": obs["robot"]["position"][1],
"target_distance": obs["sensors"]["target_distance"],
}
# For Box observation spaces
class BoxObsVarGenerator(BaseVarValueInfoGenerator):
def get_var_values(self, env, obs, info):
# obs is a numpy array
return {
"position_x": obs[0],
"position_y": obs[1],
"velocity": np.linalg.norm(obs[2:4]),
"sensor_reading": obs[4],
}
Error Handling and Debugging
Example with proper error handling and debugging:
class DebugVarGenerator(BaseVarValueInfoGenerator):
def get_var_values(self, env, obs, info):
try:
var_values = {
"d_goal": info["distance_to_goal"],
"d_obstacle": info["distance_to_obstacle"],
}
# Validate values
for key, value in var_values.items():
if not isinstance(value, (int, float)):
raise ValueError(f"Variable {key} must be numeric, got {type(value)}")
if np.isnan(value) or np.isinf(value):
print(f"Warning: {key} has non-finite value {value}")
return var_values
except KeyError as e:
raise ValueError(f"Required key missing from info: {e}")
except Exception as e:
print(f"Error in variable extraction: {e}")
# Return default values to prevent crash
return {
"d_goal": float('inf'),
"d_obstacle": float('inf'),
}
Integration with Stable-Baselines3
Example showing how to use with reinforcement learning libraries:
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
# Create wrapped environment
def make_tl_env():
env = gym.make("YourEnv-v0")
return TLObservationReward(
env,
tl_spec=tl_spec,
atomic_predicates=atomic_predicates,
var_value_info_generator=var_generator,
)
# Create vectorized environment
vec_env = make_vec_env(make_tl_env, n_envs=4)
# Train with PPO
model = PPO("MlpPolicy", vec_env, verbose=1)
model.learn(total_timesteps=100000)
# Evaluate
obs = vec_env.reset()
for _ in range(1000):
action, _states = model.predict(obs, deterministic=True)
obs, reward, done, info = vec_env.step(action)
if done:
break