Summary: The Python code below performs API calls to Flink's products and saves the results on a .json file.
Observation: The code uses our /toolbox/ instance. Please adjust the variables in session 3 to test it with your instances.
1. Code #1: Testing the Authorize, GetAccountsDetail, GetAccountsDetailsAsync and GetAllAttributes APIs
###1.LIBRARIES### ######################## Libraries ######################## import requests import time import json from endpoints import * ###2.FUNCTIONS### ######################## DELAY ######################## def delay(seconds): print("Waiting for the delay of", seconds, "seconds") time.sleep(seconds) ######################## AUTHORIZE ######################## def Authorize(instance, customerId, LoginId, elapsed_times): url1 = "https://" url2 = "-api.private.fin.ag/v3/" url3 = "/BankingServices/" endpoint = "Authorize" url = url1 + instance + url2 + customerId + url3 + endpoint data = {"LoginId": LoginId, "MostRecentCached": "true"} # Making the API call and measure its time start_time = time.time() print("\033[1m","Calling",endpoint,"\033[0m") response = requests.post(url, json=data) end_time = time.time() duration_rounded = round(end_time - start_time, 2) elapsed_times.append([endpoint,duration_rounded]) if response.status_code == 200: # The request was successful response_data = response.json() print("\t",f"Error: {response.status_code}") #Capturing the RequestId RequestId = response_data.get('RequestId') #Capturing the DocumentUploadInstitution DocumentUploadInstitution = response_data.get('DocumentUploadInstitution') #print(f"RequestId: {RequestId}") print("\t",f"Request duration: {duration_rounded} seconds") else: # The request was unsuccessful print("\t",f"Error: {response.status_code}") print(response.text) # Print the error message or response content print("\t",f"Request duration: {duration_rounded} seconds") return RequestId, DocumentUploadInstitution # Add a delay for the next call delay(3) ######################## GETACCOUNTSDETAIL ######################## def GAD(instance, customerId, LoginId, elapsed_times, RequestId): url1 = "https://" url2 = "-api.private.fin.ag/v3/" url3 = "/BankingServices/" # Setting up the API call endpoint = "GetAccountsDetail" url = url1 + instance + url2 + customerId + url3 + endpoint data = {"RequestId": RequestId} # Making the API call and measure its time start_time = time.time() print("\033[1m","Calling",endpoint,"\033[0m") response = requests.post(url, json=data) end_time = time.time() duration_rounded = round(end_time - start_time, 2) elapsed_times.append([endpoint,duration_rounded]) if response.status_code == 200: # The request was successful response_data = response.json() response_code = response.status_code print("\t",f"Request duration: {duration_rounded} seconds") else: response_code = response.status_code response_data = response.json() # The request was unsuccessful print("\t",f"Error: ", response_code) print("\t",f"Request duration: {duration_rounded} seconds") #GADASync() return response_code , response_data ######################## GETACCOUNTSDETAILASYNC ######################## def GADASync(instance, customerId, LoginId, elapsed_times, RequestId): url1 = "https://" url2 = "-api.private.fin.ag/v3/" url3 = "/BankingServices/" # Setting up the API call endpoint = "GetAccountsDetailAsync/" url = url1 + instance + url2 + customerId + url3 + endpoint + RequestId # Making the first API call and measure its time start_time = time.time() print("\033[1m","Calling",endpoint,"\033[0m") response = requests.get(url) end_time = time.time() duration_rounded = round(end_time - start_time, 2) elapsed_times.append([endpoint,duration_rounded]) while response.status_code == 202: print("\t",f"Error: {response.status_code}") delay(3) start_time = time.time() print("\t",'\033[4m',"Calling",endpoint,'\033[0m') response = requests.get(url) end_time = time.time() duration_rounded = round(end_time - start_time, 2) elapsed_times.append([endpoint,duration_rounded]) response_code = response.status_code response_data = response.json() if response.status_code == 200: # The request was successful response_code = response.status_code response_data = response.json() #print("\t",response_data) #Capturing the RequestId #RequestId = response_data.get('RequestId') #print(f"RequestId: {RequestId}") print("\t",f"Request duration: {duration_rounded} seconds") print ("\t","GADAsync_endpoint code", response_code) print ("\t","GADAsync_endpoint result:", response_data) return response_code , response_data ######################## GETALLATTRIBUTES ######################## def AllAtributes(instance, customerId, LoginId, elapsed_times, RequestId): url1 = "https://" url2 = "-api.private.fin.ag/v3/" url3 = "/insight/login/" url4 = "/attributes/" # Setting up the API call endpoint = "/GetAllAttributes" url = url1 + instance + url2 + customerId + url3 + LoginId + url4 + RequestId + endpoint # Making the first API call and measure its time start_time = time.time() print("Calling",endpoint) response = requests.get(url) end_time = time.time() duration_rounded = round(end_time - start_time, 2) elapsed_times.append([endpoint,duration_rounded]) if response.status_code == 200: # The request was successful response_data = response.json() print(f"Error: {response.status_code}") print(response.text) else: # The request was unsuccessful print(f"Error: {response.status_code}") print(response.text) # Print the error message or response content print(f"Request duration: {duration_rounded} seconds") return response_data ###3.VARIALBLES### ######################## VARIABLES ######################## instance = "toolbox" #<------- ADD YOUR INSTANCES NAME customerId = "43387ca6-0391-4c82-857d-70d95f087ecb" #<------- ADD YOUR CUSTOMER ID LoginId = "3b2ccc0e-937f-43f5-92b9-08dc83eb1e49" #<------- ADD THE LOGINID RequestId = "" DocumentUploadInstitution = "" elapsed_times = [] ###4.API CALLS AND SAVING FILES### ######################## Authentification API ######################## AuthorizeResponse = Authorize(instance, customerId, LoginId, elapsed_times) RequestId = AuthorizeResponse[0] DocumentUploadInstitution = AuthorizeResponse[1] print("") ######################## GAD+GADAsyn API ######################## GAD_payload = GAD(instance, customerId, LoginId, elapsed_times, RequestId) if GAD_payload [0] == 200: webhook_payload = GAD_payload [1] webhook_code = GAD_payload [0] print ("\t","GAD code", webhook_code) print ("\t","GAD result:", webhook_payload) print("") json_filename = 'GAD_'+LoginId+'.json' with open(json_filename, 'w') as json_file: json.dump(webhook_payload, json_file, indent=4) print(f"GADAsync_JSON data saved to {json_filename}") else: GADASync_payload = GADASync(instance, customerId, LoginId, elapsed_times, RequestId) webhook_payload = GADASync_payload [1] webhook_code = GADASync_payload [0] print ("\t","GADAsync code", webhook_code) print ("\t","GADAsync result:", webhook_payload) print("") json_filename = 'GADAsync_'+LoginId+'.json' with open(json_filename, 'w') as json_file: json.dump(webhook_payload, json_file, indent=4) print(f"GADAsync_JSON data saved to {json_filename}") print("") ######################## AllAttributes API ######################## GETAllAttributes = AllAtributes(instance, customerId, LoginId, elapsed_times, RequestId) print ("\t","GetAttributes result", GETAllAttributes) print("") # Write JSON data to a file json_filename = 'GetAllAttributes_'+LoginId+'.json' with open(json_filename, 'w') as json_file: json.dump(GETAllAttributes, json_file, indent=4) print(f"GetAllAttributes_JSON data saved to {json_filename}") print("")
Was this article helpful?
That’s Great!
Thank you for your feedback
Sorry! We couldn't be helpful
Thank you for your feedback
Feedback sent
We appreciate your effort and will try to fix the article