Skip to main content

Agent Policies

Policy system for agent safety, tool usage governance, and escalation management. Provides configurable rules for controlling agent behavior and ensuring compliance.

Core Classes

SafetyPolicy

Description: Policy for input/output safety checks and content filtering

Parameters:

  • blocked_patterns (List[str], optional): Regex patterns to block
  • sensitive_topics (List[str], optional): Sensitive topic keywords
  • pii_patterns (Dict[str, str], optional): PII detection patterns
  • max_response_length (int, optional): Maximum response length
  • enable_content_filtering (bool): Enable content filtering

Returns: SafetyPolicy instance

Example:

from recoagent.agents.policies import SafetyPolicy

# Create safety policy
safety_policy = SafetyPolicy(
blocked_patterns=[
r"(?i)(hack|exploit|vulnerability)",
r"(?i)(malware|virus|trojan)"
],
sensitive_topics=["financial", "medical", "legal"],
max_response_length=1000,
enable_content_filtering=True
)

# Evaluate input
result = safety_policy.evaluate({
"query": "How to hack a system?",
"context": "User asking about security"
})

ToolPolicy

Description: Policy for controlling tool usage and access

Parameters:

  • allowed_tools (Set[str], optional): Set of allowed tool names
  • blocked_tools (Set[str], optional): Set of blocked tool names
  • tool_usage_limits (Dict[str, int], optional): Usage limits per tool
  • require_approval (Set[str], optional): Tools requiring approval

Returns: ToolPolicy instance

Example:

from recoagent.agents.policies import ToolPolicy

# Create tool policy
tool_policy = ToolPolicy(
allowed_tools={"retrieval", "web_search"},
blocked_tools={"escalate"},
tool_usage_limits={
"web_search": 3,
"retrieval": 10
},
require_approval={"escalate"}
)

# Check tool access
result = tool_policy.evaluate({
"tool_name": "web_search",
"usage_count": 2
})

EscalationPolicy

Description: Policy for determining when to escalate queries

Parameters:

  • escalation_triggers (List[str]): Triggers for escalation
  • confidence_threshold (float): Minimum confidence for auto-handling
  • complexity_threshold (float): Maximum complexity for auto-handling
  • escalation_endpoints (Dict[str, str]): Escalation endpoints by type

Returns: EscalationPolicy instance

Example:

from recoagent.agents.policies import EscalationPolicy

# Create escalation policy
escalation_policy = EscalationPolicy(
escalation_triggers=[
"complex_technical_question",
"sensitive_topic",
"low_confidence"
],
confidence_threshold=0.7,
complexity_threshold=0.8,
escalation_endpoints={
"technical": "https://api.company.com/tech-support",
"general": "https://api.company.com/support"
}
)

# Evaluate escalation need
result = escalation_policy.evaluate({
"query": "Complex technical question",
"confidence": 0.5,
"complexity": 0.9
})

Usage Examples

Basic Safety Policy

from recoagent.agents.policies import SafetyPolicy, PolicyAction

# Create basic safety policy
safety_policy = SafetyPolicy(
blocked_patterns=[
r"(?i)(harmful|dangerous|illegal)",
r"(?i)(personal\s+information|credit\s+card)"
],
max_response_length=500
)

# Test policy
result = safety_policy.evaluate({
"query": "Tell me about machine learning",
"context": "Educational query"
})

if result.action == PolicyAction.ALLOW:
print("Query allowed")
else:
print(f"Query blocked: {result.reason}")

Advanced Tool Policy

from recoagent.agents.policies import ToolPolicy

# Create comprehensive tool policy
tool_policy = ToolPolicy(
allowed_tools={"retrieval", "web_search", "calculator"},
blocked_tools={"escalate", "admin_tools"},
tool_usage_limits={
"web_search": 5,
"retrieval": 20,
"calculator": 10
},
require_approval={"escalate", "admin_tools"}
)

# Check tool usage
result = tool_policy.evaluate({
"tool_name": "web_search",
"usage_count": 3,
"user_role": "standard"
})

print(f"Action: {result.action}")
print(f"Reason: {result.reason}")

Multi-Policy System

from recoagent.agents.policies import SafetyPolicy, ToolPolicy, EscalationPolicy

# Create policy system
policies = {
"safety": SafetyPolicy(
blocked_patterns=[r"(?i)(harmful|illegal)"],
max_response_length=1000
),
"tools": ToolPolicy(
allowed_tools={"retrieval", "web_search"},
tool_usage_limits={"web_search": 3}
),
"escalation": EscalationPolicy(
confidence_threshold=0.6,
escalation_triggers=["low_confidence", "complex_query"]
)
}

# Evaluate with multiple policies
def evaluate_query(query_data):
results = {}

for policy_name, policy in policies.items():
results[policy_name] = policy.evaluate(query_data)

return results

# Use policy system
query_data = {
"query": "What is machine learning?",
"confidence": 0.8,
"tools_used": ["retrieval"]
}

results = evaluate_query(query_data)

Custom Policy Implementation

from recoagent.agents.policies import BasePolicy, PolicyResult, PolicyAction

class CustomBusinessPolicy(BasePolicy):
"""Custom policy for business-specific rules."""

def __init__(self, business_rules: Dict[str, Any]):
self.business_rules = business_rules

def evaluate(self, input_data: Dict[str, Any]) -> PolicyResult:
"""Evaluate against business rules."""
query = input_data.get("query", "")

# Check business-specific patterns
for rule_name, pattern in self.business_rules.items():
if re.search(pattern, query, re.IGNORECASE):
return PolicyResult(
action=PolicyAction.BLOCK,
reason=f"Violates business rule: {rule_name}",
confidence=0.9,
metadata={"rule": rule_name}
)

return PolicyResult(
action=PolicyAction.ALLOW,
reason="Passes business rules",
confidence=1.0,
metadata={}
)

# Create custom policy
business_policy = CustomBusinessPolicy({
"competitor_mention": r"(?i)(competitor|rival)",
"pricing_inquiry": r"(?i)(price|cost|quote)"
})

# Use custom policy
result = business_policy.evaluate({
"query": "What are your prices?",
"context": "Customer inquiry"
})

API Reference

PolicyResult

Fields:

  • action (PolicyAction): ALLOW, BLOCK, ESCALATE, or FILTER
  • reason (str): Explanation for the decision
  • confidence (float): Confidence in the decision (0.0-1.0)
  • metadata (Dict): Additional metadata

SafetyPolicy Methods

evaluate(input_data: Dict) -> PolicyResult

Evaluate input against safety rules

Parameters:

  • input_data (Dict): Input data with query, context, etc.

Returns: PolicyResult with action and reasoning

add_pattern(pattern: str) -> None

Add new blocked pattern

Parameters:

  • pattern (str): Regex pattern to add

remove_pattern(pattern: str) -> None

Remove blocked pattern

Parameters:

  • pattern (str): Regex pattern to remove

ToolPolicy Methods

evaluate(input_data: Dict) -> PolicyResult

Evaluate tool usage request

Parameters:

  • input_data (Dict): Tool usage data

Returns: PolicyResult with action

is_tool_allowed(tool_name: str) -> bool

Check if tool is allowed

Parameters:

  • tool_name (str): Name of tool

Returns: True if allowed

get_usage_limit(tool_name: str) -> int

Get usage limit for tool

Parameters:

  • tool_name (str): Name of tool

Returns: Usage limit

EscalationPolicy Methods

evaluate(input_data: Dict) -> PolicyResult

Evaluate escalation need

Parameters:

  • input_data (Dict): Query and context data

Returns: PolicyResult with escalation decision

should_escalate(confidence: float, complexity: float) -> bool

Check if escalation is needed

Parameters:

  • confidence (float): Response confidence
  • complexity (float): Query complexity

Returns: True if escalation needed

See Also