完整的代码示例,帮助你快速集成 jisuapi Agent 服务。
import requests
APPKEY = "你的APPKEY"
BASE_URL = "https://api.jisuapi.com/agent"
headers = {
"Authorization": f"Bearer {APPKEY}",
"Content-Type": "application/json"
}
# 1. 搜索工具
response = requests.post(f"{BASE_URL}/search",
headers=headers,
json={"query": "查询手机号归属地", "limit": 3})
tools = response.json()["result"]["tools"]
print(f"找到 {len(tools)} 个工具")
# 2. 执行工具
if tools:
tool_id = tools[0]["tool_id"]
response = requests.post(f"{BASE_URL}/execute",
headers=headers,
json={"tool_id": tool_id, "params": {"mobile": "13800138000"}})
print(response.json()) import requests
from typing import Dict, Any, Optional
class JisuAPIAgent:
def __init__(self, appkey: str):
self.appkey = appkey
self.base_url = "https://api.jisuapi.com/agent"
self.headers = {
"Authorization": f"Bearer {appkey}",
"Content-Type": "application/json"
}
def search(self, query: str, limit: int = 5) -> Dict[str, Any]:
"""搜索可用工具"""
try:
response = requests.post(
f"{self.base_url}/search",
headers=self.headers,
json={"query": query, "limit": limit},
timeout=30
)
response.raise_for_status()
result = response.json()
if result["status"] == 0:
return result["result"]
else:
raise Exception(f"搜索失败: {result['msg']}")
except requests.exceptions.RequestException as e:
raise Exception(f"请求失败: {e}")
def execute(self, tool_id: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""执行工具"""
try:
response = requests.post(
f"{self.base_url}/execute",
headers=self.headers,
json={"tool_id": tool_id, "params": params},
timeout=30
)
response.raise_for_status()
result = response.json()
if result["status"] == 0:
return result
else:
raise Exception(f"执行失败: {result['msg']}")
except requests.exceptions.RequestException as e:
raise Exception(f"请求失败: {e}")
def search_and_execute(self, query: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""搜索并执行第一个匹配的工具"""
# 搜索工具
search_result = self.search(query, limit=1)
tools = search_result.get("tools", [])
if not tools:
print(f"未找到匹配的工具: {query}")
return None
# 执行第一个工具
tool = tools[0]
print(f"找到工具: {tool['name']} ({tool['tool_id']})")
result = self.execute(tool["tool_id"], params)
return result
# 使用示例
if __name__ == "__main__":
agent = JisuAPIAgent("你的APPKEY")
# 查询手机号归属地
result = agent.search_and_execute(
query="查询手机号归属地",
params={"mobile": "13800138000"}
)
if result:
print("查询结果:", result["data"])
print("计费信息:", result["meta"]["charge"]) import aiohttp
import asyncio
from typing import Dict, Any
class AsyncJisuAPIAgent:
def __init__(self, appkey: str):
self.appkey = appkey
self.base_url = "https://api.jisuapi.com/agent"
self.headers = {
"Authorization": f"Bearer {appkey}",
"Content-Type": "application/json"
}
async def search(self, query: str, limit: int = 5) -> Dict[str, Any]:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}/search",
headers=self.headers,
json={"query": query, "limit": limit}
) as response:
result = await response.json()
if result["status"] == 0:
return result["result"]
else:
raise Exception(f"搜索失败: {result['msg']}")
async def execute(self, tool_id: str, params: Dict[str, Any]) -> Dict[str, Any]:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}/execute",
headers=self.headers,
json={"tool_id": tool_id, "params": params}
) as response:
result = await response.json()
if result["status"] == 0:
return result
else:
raise Exception(f"执行失败: {result['msg']}")
# 使用示例
async def main():
agent = AsyncJisuAPIAgent("你的APPKEY")
# 并发查询多个
tasks = [
agent.search_and_execute("手机号归属地", {"mobile": "13800138000"}),
agent.search_and_execute("天气查询", {"city": "北京"}),
]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main()) const axios = require('axios');
const APPKEY = '你的APPKEY';
const BASE_URL = 'https://api.jisuapi.com/agent';
const headers = {
'Authorization': `Bearer ${APPKEY}`,
'Content-Type': 'application/json'
};
// 搜索并执行工具
(async () => {
try {
// 1. 搜索工具
const searchRes = await axios.post(`${BASE_URL}/search`, {
query: '查询手机号归属地',
limit: 3
}, { headers });
const tools = searchRes.data.result.tools;
console.log(`找到 ${tools.length} 个工具`);
// 2. 执行工具
if (tools.length > 0) {
const execRes = await axios.post(`${BASE_URL}/execute`, {
tool_id: tools[0].tool_id,
params: { mobile: '13800138000' }
}, { headers });
console.log('查询结果:', execRes.data.data);
console.log('计费信息:', execRes.data.meta.charge);
}
} catch (error) {
console.error('请求失败:', error.message);
}
})(); const axios = require('axios');
class JisuAPIAgent {
constructor(appkey) {
this.appkey = appkey;
this.baseUrl = 'https://api.jisuapi.com/agent';
this.headers = {
'Authorization': `Bearer ${appkey}`,
'Content-Type': 'application/json'
};
}
async search(query, limit = 5) {
try {
const response = await axios.post(
`${this.baseUrl}/search`,
{ query, limit },
{ headers: this.headers, timeout: 30000 }
);
const result = response.data;
if (result.status === 0) {
return result.result;
} else {
throw new Error(`搜索失败: ${result.msg}`);
}
} catch (error) {
throw new Error(`请求失败: ${error.message}`);
}
}
async execute(toolId, params) {
try {
const response = await axios.post(
`${this.baseUrl}/execute`,
{ tool_id: toolId, params },
{ headers: this.headers, timeout: 30000 }
);
const result = response.data;
if (result.status === 0) {
return result;
} else {
throw new Error(`执行失败: ${result.msg}`);
}
} catch (error) {
throw new Error(`请求失败: ${error.message}`);
}
}
async searchAndExecute(query, params) {
// 搜索工具
const searchResult = await this.search(query, 1);
const tools = searchResult.tools || [];
if (tools.length === 0) {
console.log(`未找到匹配的工具: ${query}`);
return null;
}
// 执行第一个工具
const tool = tools[0];
console.log(`找到工具: ${tool.name} (${tool.tool_id})`);
const result = await this.execute(tool.tool_id, params);
return result;
}
}
// 使用示例
(async () => {
const agent = new JisuAPIAgent('你的APPKEY');
// 查询手机号归属地
const result = await agent.searchAndExecute(
'查询手机号归属地',
{ mobile: '13800138000' }
);
if (result) {
console.log('查询结果:', result.data);
console.log('计费信息:', result.meta.charge);
}
})(); import axios, { AxiosInstance } from 'axios';
interface SearchResult {
search_id: string;
total: number;
tools: Tool[];
}
interface Tool {
tool_id: string;
name: string;
description: string;
parameters: Record;
}
interface ExecuteResult {
status: number;
msg: string;
data: any;
meta: {
tool_id: string;
execution_id: string;
charge: any;
};
}
class JisuAPIAgent {
private appkey: string;
private baseUrl: string;
private client: AxiosInstance;
constructor(appkey: string) {
this.appkey = appkey;
this.baseUrl = 'https://api.jisuapi.com/agent';
this.client = axios.create({
baseURL: this.baseUrl,
headers: {
'Authorization': `Bearer ${appkey}`,
'Content-Type': 'application/json'
},
timeout: 30000
});
}
async search(query: string, limit: number = 5): Promise {
const response = await this.client.post('/search', { query, limit });
const result = response.data;
if (result.status === 0) {
return result.result;
} else {
throw new Error(`搜索失败: ${result.msg}`);
}
}
async execute(toolId: string, params: Record): Promise {
const response = await this.client.post('/execute', {
tool_id: toolId,
params
});
const result = response.data;
if (result.status === 0) {
return result;
} else {
throw new Error(`执行失败: ${result.msg}`);
}
}
async searchAndExecute(query: string, params: Record): Promise {
const searchResult = await this.search(query, 1);
const tools = searchResult.tools || [];
if (tools.length === 0) {
console.log(`未找到匹配的工具: ${query}`);
return null;
}
const tool = tools[0];
console.log(`找到工具: ${tool.name} (${tool.tool_id})`);
return await this.execute(tool.tool_id, params);
}
}
export default JisuAPIAgent; package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
)
type JisuAPIAgent struct {
appkey string
baseURL string
client *http.Client
}
func NewAgent(appkey string) *JisuAPIAgent {
return &JisuAPIAgent{
appkey: appkey,
baseURL: "https://api.jisuapi.com/agent",
client: &http.Client{},
}
}
func (a *JisuAPIAgent) Search(query string, limit int) (map[string]interface{}, error) {
data := map[string]interface{}{
"query": query,
"limit": limit,
}
return a.request("POST", "/search", data)
}
func (a *JisuAPIAgent) Execute(toolID string, params map[string]interface{}) (map[string]interface{}, error) {
data := map[string]interface{}{
"tool_id": toolID,
"params": params,
}
return a.request("POST", "/execute", data)
}
func (a *JisuAPIAgent) request(method, path string, data map[string]interface{}) (map[string]interface{}, error) {
jsonData, _ := json.Marshal(data)
req, _ := http.NewRequest(method, a.baseURL+path, bytes.NewBuffer(jsonData))
req.Header.Set("Authorization", "Bearer "+a.appkey)
req.Header.Set("Content-Type", "application/json")
resp, err := a.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
var result map[string]interface{}
json.Unmarshal(body, &result)
return result, nil
}
func main() {
agent := NewAgent("你的APPKEY")
// 搜索工具
searchResult, _ := agent.Search("查询手机号归属地", 3)
fmt.Println(searchResult)
// 执行工具
execResult, _ := agent.Execute("shouji_query", map[string]interface{}{
"mobile": "13800138000",
})
fmt.Println(execResult)
} # .env 文件
JISUAPI_KEY=your_appkey_here
# Python 读取
from dotenv import load_dotenv
import os
load_dotenv()
APPKEY = os.getenv('JISUAPI_KEY') import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def execute_with_log(tool_id, params):
logger.info(f"执行工具: {tool_id}, 参数: {params}")
result = agent.execute(tool_id, params)
logger.info(f"执行成功, request_id: {result.get('request_id')}")
return result from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def execute_with_retry(tool_id, params):
return agent.execute(tool_id, params) 需要帮助?如有其他问题,请联系客服微信或发送邮件至 kf@jisuapi.com


© 2015-2025 杭州极速互联科技有限公司 版权所有 浙ICP备17047587号-4 浙公网安备33010502005096 增值电信业务经营许可证:浙B2-20190875