Summary: The Python code below performs API calls to Flink's products and saves the results on a .json file.
Observation: The code uses our /toolbox/ instance. Please adjust the variables in session 3 to test it with your instances.

1. Code #1: Testing the GenerateAuthorizeToken, Authorize, GetAccountsDetail, GetAccountsDetailsAsync and GetAllAttributes APIs
###1.LIBRARIES###
######################## Libraries ########################
import requests
import time
import json
###2.FUNCTIONS###
######################## DELAY ########################
def delay(seconds):
print("Waiting for the delay of", seconds, "seconds")
time.sleep(seconds)
######################## GenerateAuthorizeToken ########################
def GenerateAuthorizeToken(instance, customerId, flinksauthkey
, elapsed_times):
url1 = "https://"
url2 = "-api.private.fin.ag/v3/"
url3 = "/BankingServices/"
endpoint = "GenerateAuthorizeToken"
url = url1 + instance + url2 + customerId + url3 + endpoint
headers = {'flinks-auth-key': flinksauthkey}
# Making the API call and measure its time
start_time = time.time()
print("\033[1m","Calling",endpoint,"\033[0m")
response = requests.post(url, headers=headers)
end_time = time.time()
duration_rounded = round(end_time - start_time, 2)
elapsed_times.append([endpoint,duration_rounded])
if response.status_code == 200:
# The request was successful
response_data = response.json()
print("\t",f"Error: {response.status_code}")
#Capturing the TempToken
TempToken = response_data.get('Token')
#print(f"TempToken: {Token}")
print("\t",f"Request duration: {duration_rounded} seconds")
else:
# The request was unsuccessful
print("\t",f"Error: {response.status_code}")
print(response.text) # Print the error message or response content
print("\t",f"Request duration: {duration_rounded} seconds")
return TempToken, DocumentUploadInstitution, elapsed_times
# Add a delay for the next call
delay(3)
######################## AUTHORIZE ########################
def Authorize(instance, customerId, LoginId, TempToken, elapsed_times):
url1 = "https://"
url2 = "-api.private.fin.ag/v3/"
url3 = "/BankingServices/"
endpoint = "Authorize"
url = url1 + instance + url2 + customerId + url3 + endpoint
data = {"LoginId": LoginId, "MostRecentCached": "true"}
headers = {'flinks-auth-key': TempToken}
# Making the API call and measure its time
start_time = time.time()
print("\033[1m","Calling",endpoint,"\033[0m")
response = requests.post(url, headers=headers, json=data)
end_time = time.time()
duration_rounded = round(end_time - start_time, 2)
elapsed_times.append([endpoint,duration_rounded])
if response.status_code == 200:
# The request was successful
response_data = response.json()
print("\t",f"Error: {response.status_code}")
#Capturing the RequestId
RequestId = response_data.get('RequestId')
#Capturing the DocumentUploadInstitution
DocumentUploadInstitution = response_data.get('DocumentUploadInstitution')
#print(f"RequestId: {RequestId}")
print("\t",f"Request duration: {duration_rounded} seconds")
else:
# The request was unsuccessful
print("\t",f"Error: {response.status_code}")
print(response.text) # Print the error message or response content
print("\t",f"Request duration: {duration_rounded} seconds")
return RequestId, DocumentUploadInstitution, elapsed_times
# Add a delay for the next call
delay(3)
######################## GETACCOUNTSDETAIL ########################
def GAD(instance, customerId, xapikey , elapsed_times, RequestId):
url1 = "https://"
url2 = "-api.private.fin.ag/v3/"
url3 = "/BankingServices/"
# Setting up the API call
endpoint = "GetAccountsDetail"
url = url1 + instance + url2 + customerId + url3 + endpoint
data = {"RequestId": RequestId}
headers = {'x-api-key': xapikey}
# Making the API call and measure its time
start_time = time.time()
print("\033[1m","Calling",endpoint,"\033[0m")
response = requests.post(url, headers=headers, json=data)
end_time = time.time()
duration_rounded = round(end_time - start_time, 2)
elapsed_times.append([endpoint,duration_rounded])
if response.status_code == 200:
# The request was successful
response_data = response.json()
response_code = response.status_code
print("\t",f"Request duration: {duration_rounded} seconds")
else:
response_code = response.status_code
response_data = response.json()
# The request was unsuccessful
print("\t",f"Error: ", response_code)
print("\t",f"Request duration: {duration_rounded} seconds")
#GADASync()
return response_code , response_data, elapsed_times
######################## GETACCOUNTSDETAILASYNC ########################
def GADASync(instance, customerId, elapsed_times, RequestId):
url1 = "https://"
url2 = "-api.private.fin.ag/v3/"
url3 = "/BankingServices/"
# Setting up the API call
endpoint = "GetAccountsDetailAsync/"
url = url1 + instance + url2 + customerId + url3 + endpoint + RequestId
# Making the first API call and measure its time
start_time = time.time()
print("\033[1m","Calling",endpoint,"\033[0m")
response = requests.get(url)
end_time = time.time()
duration_rounded = round(end_time - start_time, 2)
elapsed_times.append([endpoint,duration_rounded])
while response.status_code == 202:
print("\t",f"Error: {response.status_code}")
delay(3)
start_time = time.time()
print("\t",'\033[4m',"Calling",endpoint,'\033[0m')
response = requests.get(url)
end_time = time.time()
duration_rounded = round(end_time - start_time, 2)
elapsed_times.append([endpoint,duration_rounded])
response_code = response.status_code
response_data = response.json()
if response.status_code == 200:
# The request was successful
response_code = response.status_code
response_data = response.json()
#print("\t",response_data)
#Capturing the RequestId
#RequestId = response_data.get('RequestId')
#print(f"RequestId: {RequestId}")
print("\t",f"Request duration: {duration_rounded} seconds")
print ("\t","GADAsync_endpoint code", response_code)
print ("\t","GADAsync_endpoint result:", response_data)
return response_code , response_data, elapsed_times
######################## GETALLATTRIBUTES ########################
def AllAtributes(instance, customerId, LoginId, xapikey, elapsed_times, RequestId):
url1 = "https://"
url2 = "-api.private.fin.ag/v3/"
url3 = "/insight/login/"
url4 = "/attributes/"
# Setting up the API call
endpoint = "/GetAllAttributes"
url = url1 + instance + url2 + customerId + url3 + LoginId + url4 + RequestId + endpoint
headers = {'x-api-key': xapikey}
# Making the first API call and measure its time
start_time = time.time()
print("Calling",endpoint)
response = requests.get(url, headers=headers )
end_time = time.time()
duration_rounded = round(end_time - start_time, 2)
elapsed_times.append([endpoint,duration_rounded])
if response.status_code == 200:
# The request was successful
response_data = response.json()
print(f"Error: {response.status_code}")
print(response.text)
else:
# The request was unsuccessful
print(f"Error: {response.status_code}")
print(response.text) # Print the error message or response content
print(f"Request duration: {duration_rounded} seconds")
return response_data, elapsed_times
###3.VARIALBLES###
######################## VARIABLES ########################
instance = "toolbox" #<------- ADD YOUR INSTANCES NAME
customerId = "43387ca6-0391-4c82-857d-70d95f087ecb" #<------- ADD YOUR CUSTOMER ID
LoginId = "310196b5-0151-4bd9-053b-08ddeae4001f" #<------- ADD THE LOGINID
RequestId = ""
flinksauthkey = "c4569c54-e167-4d34-8de6-f4113bc82414"
xapikey = "3d5266a8-b697-48d4-8de6-52e2e2662acc"
DocumentUploadInstitution = ""
elapsed_times = [0]
###4.API CALLS AND SAVING FILES###
######################## Authentification API ########################
GenerateAuthorizeTokenResponse = GenerateAuthorizeToken(instance, customerId, flinksauthkey
, elapsed_times)
TempToken = GenerateAuthorizeTokenResponse [0]
DocumentUploadInstitution = GenerateAuthorizeTokenResponse[1]
elapsed_times = GenerateAuthorizeTokenResponse[2]
print ("")
AuthorizeResponse = Authorize(instance, customerId, LoginId, TempToken, elapsed_times)
RequestId = AuthorizeResponse[0]
DocumentUploadInstitution = AuthorizeResponse[1]
elapsed_times = AuthorizeResponse[2]
print("")
######################## GAD+GADAsyn API ########################
GAD_payload = GAD(instance, customerId, xapikey , elapsed_times, RequestId)
if GAD_payload [0] == 200:
webhook_payload = GAD_payload [1]
webhook_code = GAD_payload [0]
print ("\t","GAD code", webhook_code)
print ("\t","GAD result:", webhook_payload)
print("")
json_filename = 'GAD_'+LoginId+'.json'
with open(json_filename, 'w') as json_file:
json.dump(webhook_payload, json_file, indent=4)
print(f"GADAsync_JSON data saved to {json_filename}")
else:
GADASync_payload = GADASync(instance, customerId, elapsed_times, RequestId)
webhook_payload = GADASync_payload [1]
webhook_code = GADASync_payload [0]
print ("\t","GADAsync code", webhook_code)
print ("\t","GADAsync result:", webhook_payload)
print("")
json_filename = 'GADAsync_'+LoginId+'.json'
with open(json_filename, 'w') as json_file:
json.dump(webhook_payload, json_file, indent=4)
print(f"GADAsync_JSON data saved to {json_filename}")
print("")
######################## AllAttributes API ########################
GETAllAttributes = AllAtributes(instance, customerId, LoginId, xapikey, elapsed_times, RequestId)
print ("\t","GetAttributes result", GETAllAttributes)
print("")
# Write JSON data to a file
json_filename = 'GetAllAttributes_'+LoginId+'.json'
with open(json_filename, 'w') as json_file:
json.dump(GETAllAttributes, json_file, indent=4)
print(f"GetAllAttributes_JSON data saved to {json_filename}")
print("")Was this article helpful?
That’s Great!
Thank you for your feedback
Sorry! We couldn't be helpful
Thank you for your feedback
Feedback sent
We appreciate your effort and will try to fix the article