1. 適用版本
適用于RPA2020.4以及以上版本,
2. 接口 API 手冊
調用方式及字段,請參考論壇手冊:
暫未在論壇公開,請向當?shù)貜S商技術索取
3. Python 調用代碼
如果想用機器人來調 API 接口,可參考以下代碼,其它語言邏輯一樣:
import json
import time
import requests
from urllib import parse
import hmac
import base64
from hashlib import sha256
def rpa_rest_2020_4(host='',rest_type='',data_json=None,mode='',port=443,accesstoken=None,retry=2):
'''
host:地址,str型,示例:'https://192.168.202.11'
rest_type:str型,接口類型,示例:'/oapi/v1/job/create'
data_json:字典型,發(fā)送的報文數(shù)據(jù)json格式
mode:接口請求方式(get、post、delete及put)
port:int型,https端口
retry:int型,重試次數(shù)
返回值:get_field_json:字典型,
'''
get_field_json={'code': 40, 'msg': 'fail,意外情況','result':None}
#url參數(shù)轉換
def json2Params(data_json):
get_field_json={'code': 41, 'msg': 'fail,轉換URL參數(shù)失敗!','result':None}
try:
data_json=json.loads(data_json)
url_str = ''
nums = 0
max_nums = len(data_json)
for key in data_json:
nums += 1
if nums == max_nums:
url_str += str(key) + '=' + str(data_json[key])
else:
url_str += str(key) + '=' + str(data_json[key]) + '&'
except Exception as e:
print('參數(shù)轉化失敗:',e)
url_str=''
finally:
return url_str
if mode != 'get' and mode != 'put' and mode != 'post' and mode != 'delete':
get_field_json={'code': 42, 'msg': 'fail,mode錯誤,只能為get、put、post或delete!','result':None}
return get_field_json
#獲得簽名sign
sign = ''
sign_yc = ''
url = host
if port != 443:
url = url + ':' + str(port)
url += rest_type
timestamp = None
#獲取簽名值sign:
try:
get_field_json={'code': 41, 'msg': 'fail,獲取簽名失敗!','result':None}
#獲得毫秒級時間戳(時間出入不能大于10分鐘)
timestamp=str(int(round(time.time() * 1000)))
#處理json數(shù)據(jù)不為str類型的情況
for key in data_json:
data_json[key]=str(data_json[key])
#json排序
data_json=json.dumps(data_json,sort_keys=True)
#構造源串sign_yc
if mode == 'get':
json_str=json2Params(data_json)
sign_yc=json_str + '&token=' + accesstoken + '×tamp=' + timestamp
url = url+'?'+json_str
else:
sign_yc=str(data_json) + '&token=' + accesstoken + '×tamp=' + timestamp
print('源串:',sign_yc)
print('URL:',url)
#URL編碼
url_bm = parse.quote(sign_yc, encoding="utf-8")
#sha256加密密碼
byte_key = bytes(accesstoken, encoding="utf-8")
byte_url_bm = bytes(url_bm, encoding="utf-8")
hn256 = hmac.new(byte_key, byte_url_bm, digestmod=sha256)
hh256 = hn256.hexdigest()
#Base64編碼
bb64 = base64.b64encode(bytes(hh256, encoding="utf-8"))
#獲取sign
sign = str(bb64, "utf-8")
#替換rn
sign=sign.replace('rn', '')
print('簽名值sign:',sign)
except Exception as e:
print(e)
return get_field_json
#開始嘗試發(fā)送API
print('開始嘗試發(fā)送API')
for count in range(retry):
print('嘗試第',count+1,'次API請求任務。')
try:
print('開始嘗試:',mode)
#post、delete、put
if mode!='get':
header_dict = {'Content-Type': 'application/json;charset=UTF-8','accesstoken':accesstoken,'timestamp':timestamp,'signature':sign}
if mode=='post':
if rest_type == '/oapi/v1/job/create':
header_dict = {'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8','accesstoken':accesstoken,'timestamp':timestamp,'signature':sign}
json_str=json2Params(data_json)
url = url+'?'+json_str
res = requests.post(url, data=str(data_json), headers=header_dict,verify=False)
else:
json_str=json2Params(data_json)
url = url+'?'+json_str
header_dict = {'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8','accesstoken':accesstoken,'timestamp':timestamp,'signature':sign}
res = requests.post(url, data=str(data_json), headers=header_dict,verify=False)
if mode=='put':
res = requests.put(url, data=str(data_json), headers=header_dict,verify=False)
if mode=='delete':
res = requests.delete(url, data=str(data_json), headers=header_dict,verify=False)
if mode=='get':
header_dict = {'accesstoken':accesstoken,'timestamp':timestamp,'signature':sign}
res = requests.get(url,headers=header_dict,verify=False)
#獲取返回值
res_text=res.text
#轉化成json
get_field_json=json.loads(res_text)
print('請求成功!')
break
except Exception as e:
print('請求失?。?#39;,e)
get_field_json={'code': 40, 'msg': 'fail,發(fā)送API失敗!','result':None}
return get_field_json
def get_token(host='',port='',accessKey='',secretKey='',retry=2):
'''
host:地址,str型,示例:'https://192.168.202.11'
port:int型,https端口
accessKey:str型,服務平臺應用AppKey
secretKey:str型,服務平臺應用AppSecret
retry:int型,重試次數(shù)
'''
get_field_json={'code': 44, 'msg': 'fail,獲取token失敗','result':None}
for i in range(retry):
try:
url = host
if port != 443:
url = url + ':' + str(port)
json_str='accessKey='+accessKey+'&secretKey='+secretKey
url = url+'/oapi/v1/token?'+json_str
res = requests.get(url,verify=False)
res_text=res.text
#轉化成json
get_field_json=json.loads(res_text)
print('獲取token,第'+str(i+1)+'次,成功')
break
except Exception as e:
print('獲取token,第'+str(i+1)+'次,失敗',e)
time.sleep(1)
return get_field_json
def refresh_token(host='',port='',refleshtoken='',retry=2):
'''
host:地址,str型,示例:'https://192.168.202.11'
port:int型,https端口
refleshtoken:str型,刷新token
retry:int型,重試次數(shù)
'''
get_field_json={'code': 44, 'msg': 'fail,刷新token失敗','result':None}
for i in range(retry):
try:
url = host
if port != 443:
url = url + ':' + str(port)
json_str='refreshToken='+refleshtoken
url = url+'/oapi/v1/token?'+json_str
res = requests.get(url,verify=False)
res_text=res.text
#轉化成json
get_field_json=json.loads(res_text)
print('刷新token,第'+str(i+1)+'次,成功')
break
except Exception as e:
print('刷新token,第'+str(i+1)+'次,失敗',e)
time.sleep(1)
return get_field_json
4. 其它平臺或客戶端調用
4.1 其它平臺調用
按照第 4 章的邏輯自行寫調用代碼即可。
4.2 機器人調用
按照第 4 章添加一個全局函數(shù),在需要調用的地方使用全局函數(shù)控件即可。
注:需要提前獲取token后調用,如無第三方平臺對接,獲取的token可存在共享變量里,分配權限調用,參考下圖:

本文由網(wǎng)上采集發(fā)布,不代表我們立場,轉載聯(lián)系作者并注明出處:http://m.zltfw.cn/shbk/37302.html