JSON data processing tools for AI agents.
Install Dependencies
pip install praisonaiagents
Import Components
from praisonaiagents import Agent, Task, PraisonAIAgents from praisonaiagents.tools import ( read_json, write_json, merge_json, validate_json, analyze_json, transform_json )
Create Agent
json_agent = Agent( name="JSONProcessor", role="JSON Processing Specialist", goal="Process JSON files efficiently and accurately.", backstory="Expert in JSON file manipulation and validation.", tools=[ read_json, write_json, merge_json, validate_json, analyze_json, transform_json ], self_reflect=False )
Define Task
json_task = Task( description="Parse and validate API response data.", expected_output="Validated and processed JSON data.", agent=json_agent, name="json_processing" )
Run Agent
agents = PraisonAIAgents( agents=[json_agent], tasks=[json_task], process="sequential" ) agents.start()
Agent(tools=[read_json, write_json, merge_json, validate_json, analyze_json, transform_json])
Task(description="json_operation")
process="sequential"
indent=2, sort_keys=True
from praisonaiagents.tools import read_json from praisonaiagents.tools import write_json from praisonaiagents.tools import merge_json from praisonaiagents.tools import validate_json from praisonaiagents.tools import analyze_json from praisonaiagents.tools import transform_json
# Basic usage data = read_json("config.json") # With schema validation schema = { "type": "object", "properties": { "name": {"type": "string"}, "age": {"type": "integer"} } } data = read_json( "user.json", encoding="utf-8", validate_schema=schema ) # Returns: Dict[str, Any] (JSON data) # Error case: Returns dict with 'error' key
# Basic usage data = {"name": "Alice", "age": 30} success = write_json(data, "output.json") # With formatting options data = { "users": [ {"name": "Alice", "age": 30}, {"name": "Bob", "age": 25} ] } success = write_json( data, "users.json", indent=4, sort_keys=True, ensure_ascii=False ) # Returns: bool (True if successful)
# Merge configuration files success = merge_json( files=["config1.json", "config2.json"], output_file="merged_config.json" ) # Advanced merge with options success = merge_json( files=["base.json", "override.json"], output_file="final.json", merge_arrays=True, overwrite_duplicates=False ) # Returns: bool (True if successful)
# Validate data schema = { "type": "object", "required": ["name", "email"], "properties": { "name": {"type": "string"}, "email": {"type": "string", "format": "email"}, "age": {"type": "integer", "minimum": 0} } } # Validate data directly data = {"name": "Alice", "email": "alice@example.com", "age": 25} is_valid, error = validate_json(data, schema) # Validate file is_valid, error = validate_json("user.json", schema) # Returns: Tuple[bool, Optional[str]]
# Analyze file analysis = analyze_json("data.json", max_depth=5) # Analyze data structure data = { "users": [ {"name": "Alice", "scores": [95, 87, 92]}, {"name": "Bob", "scores": [88, 85, 90]} ] } analysis = analyze_json(data) # Returns: Dict[str, Any] # Example output: # { # 'analysis_time': '2023-01-01T12:00:00', # 'structure': { # 'type': 'dict', # 'size': 1, # 'keys': ['users'], # 'children': { # 'users': { # 'type': 'list', # 'length': 2, # 'element_types': ['dict'], # 'sample_elements': [...] # } # } # } # }
# Transform data transformations = [ { "operation": "set", "path": "user.settings.theme", "value": "dark" }, { "operation": "move", "path": "old.config", "value": "new.config" }, { "operation": "delete", "path": "temporary.data" } ] # Transform file result = transform_json("config.json", transformations) # Transform data directly data = {"user": {"name": "Alice"}} result = transform_json(data, transformations) # Returns: Dict[str, Any] (transformed data)
from praisonaiagents import Agent from praisonaiagents.tools import ( read_json, write_json, merge_json, validate_json, analyze_json, transform_json ) agent = Agent( name="JSONProcessor", description="An agent that processes JSON data", tools=[ read_json, write_json, merge_json, validate_json, analyze_json, transform_json ] )
# Merge multiple config files success = merge_json( ["base_config.json", "env_config.json", "user_config.json"], "final_config.json", merge_arrays=True, overwrite_duplicates=True ) # Validate configuration schema = { "type": "object", "required": ["database", "api"], "properties": { "database": { "type": "object", "required": ["host", "port"] }, "api": { "type": "object", "required": ["key"] } } } is_valid, error = validate_json("final_config.json", schema)
# Analyze data structure analysis = analyze_json("large_dataset.json", max_depth=3) print(f"Dataset size: {analysis['structure']['size']} entries") print(f"Available fields: {analysis['structure']['keys']}")
# Transform data format transformations = [ {"operation": "rename", "path": "firstName", "value": "first_name"}, {"operation": "rename", "path": "lastName", "value": "last_name"}, {"operation": "move", "path": "address.zip", "value": "address.postal_code"}, {"operation": "delete", "path": "temporary_data"} ] result = transform_json("user_data.json", transformations) write_json(result, "transformed_data.json")
from praisonaiagents import Agent, Task, PraisonAIAgents from praisonaiagents.tools import read_json, write_json, validate_json # Create JSON processing agent json_agent = Agent( name="JSONExpert", role="JSON Processor", goal="Process and validate JSON data efficiently.", backstory="Expert in JSON processing and validation.", tools=[read_json, write_json, validate_json], self_reflect=False ) # Define JSON task json_task = Task( description="Process and validate configuration files.", expected_output="Validated JSON configuration.", agent=json_agent, name="config_validation" ) # Run agent agents = PraisonAIAgents( agents=[json_agent], tasks=[json_task], process="sequential" ) agents.start()
# Create JSON validation agent validation_agent = Agent( name="Validator", role="JSON Validation Specialist", goal="Ensure JSON data integrity and schema compliance.", tools=[validate_json, analyze_json], self_reflect=False ) # Create JSON transformation agent transform_agent = Agent( name="Transformer", role="JSON Transformation Specialist", goal="Transform and merge JSON data structures.", tools=[transform_json, merge_json], self_reflect=False ) # Define tasks validation_task = Task( description="Validate JSON data", agent=validation_agent, name="json_validation" ) transform_task = Task( description="Transform JSON data", agent=transform_agent, name="json_transformation" ) # Run agents agents = PraisonAIAgents( agents=[validation_agent, transform_agent], tasks=[validation_task, transform_task], process="sequential" ) agents.start()
Agent Configuration
Agent( name="JSONProcessor", role="JSON Processing Specialist", goal="Process JSON files accurately and safely", tools=[read_json, write_json, merge_json, validate_json, analyze_json, transform_json] )
Task Definition
Task( description="Parse and validate API response data", expected_output="Validated data structure" )
# Processing agent processor = Agent( name="Processor", role="JSON Processor", tools=[read_json, write_json, merge_json, validate_json, analyze_json, transform_json] ) # Validation agent validator = Agent( name="Validator", role="Data Validator" ) # Define tasks process_task = Task( description="Process JSON files", agent=processor ) validate_task = Task( description="Validate processed data", agent=validator ) # Run workflow agents = PraisonAIAgents( agents=[processor, validator], tasks=[process_task, validate_task] )