Summary: The Python code below performs API calls to Flink's products and saves the results on a .json file.
Observation: The code uses our /toolbox/ instance. Please adjust the variables in session 3 to test it with your instances.
1. Code #1: Testing the Authorize, GetAccountsDetail, GetAccountsDetailsAsync and GetAllAttributes APIs
###1.LIBRARIES### ######################## Libraries ######################## import requests import time import json from endpoints import * ###2.FUNCTIONS### ######################## DELAY ######################## def delay(seconds): print("Waiting for the delay of", seconds, "seconds") time.sleep(seconds) ######################## AUTHORIZE ######################## def Authorize(instance, customerId, LoginId, elapsed_times): url1 = "https://" url2 = "-api.private.fin.ag/v3/" url3 = "/BankingServices/" endpoint = "Authorize" url = url1 + instance + url2 + customerId + url3 + endpoint data = {"LoginId": LoginId, "MostRecentCached": "true"} # Making the API call and measure its time start_time = time.time() print("\033[1m","Calling",endpoint,"\033[0m") response = requests.post(url, json=data) end_time = time.time() duration_rounded = round(end_time - start_time, 2) elapsed_times.append([endpoint,duration_rounded]) if response.status_code == 200: # The request was successful response_data = response.json() print("\t",f"Error: {response.status_code}") #Capturing the RequestId RequestId = response_data.get('RequestId') #Capturing the DocumentUploadInstitution DocumentUploadInstitution = response_data.get('DocumentUploadInstitution') #print(f"RequestId: {RequestId}") print("\t",f"Request duration: {duration_rounded} seconds") else: # The request was unsuccessful print("\t",f"Error: {response.status_code}") print(response.text) # Print the error message or response content print("\t",f"Request duration: {duration_rounded} seconds") return RequestId, DocumentUploadInstitution # Add a delay for the next call delay(3) ######################## GETACCOUNTSDETAIL ######################## def GAD(instance, customerId, LoginId, elapsed_times, RequestId): url1 = "https://" url2 = "-api.private.fin.ag/v3/" url3 = "/BankingServices/" # Setting up the API call endpoint = "GetAccountsDetail" url = url1 + instance + url2 + customerId + url3 + endpoint data = {"RequestId": RequestId} # Making the API call and measure its time start_time = time.time() print("\033[1m","Calling",endpoint,"\033[0m") response = requests.post(url, json=data) end_time = time.time() duration_rounded = round(end_time - start_time, 2) elapsed_times.append([endpoint,duration_rounded]) if response.status_code == 200: # The request was successful response_data = response.json() response_code = response.status_code print("\t",f"Request duration: {duration_rounded} seconds") else: response_code = response.status_code response_data = response.json() # The request was unsuccessful print("\t",f"Error: ", response_code) print("\t",f"Request duration: {duration_rounded} seconds") #GADASync() return response_code , response_data ######################## GETACCOUNTSDETAILASYNC ######################## def GADASync(instance, customerId, LoginId, elapsed_times, RequestId): url1 = "https://" url2 = "-api.private.fin.ag/v3/" url3 = "/BankingServices/" # Setting up the API call endpoint = "GetAccountsDetailAsync/" url = url1 + instance + url2 + customerId + url3 + endpoint + RequestId # Making the first API call and measure its time start_time = time.time() print("\033[1m","Calling",endpoint,"\033[0m") response = requests.get(url) end_time = time.time() duration_rounded = round(end_time - start_time, 2) elapsed_times.append([endpoint,duration_rounded]) while response.status_code == 202: print("\t",f"Error: {response.status_code}") delay(3) start_time = time.time() print("\t",'\033[4m',"Calling",endpoint,'\033[0m') response = requests.get(url) end_time = time.time() duration_rounded = round(end_time - start_time, 2) elapsed_times.append([endpoint,duration_rounded]) response_code = response.status_code response_data = response.json() if response.status_code == 200: # The request was successful response_code = response.status_code response_data = response.json() #print("\t",response_data) #Capturing the RequestId #RequestId = response_data.get('RequestId') #print(f"RequestId: {RequestId}") print("\t",f"Request duration: {duration_rounded} seconds") print ("\t","GADAsync_endpoint code", response_code) print ("\t","GADAsync_endpoint result:", response_data) return response_code , response_data ######################## GETALLATTRIBUTES ######################## def AllAtributes(instance, customerId, LoginId, elapsed_times, RequestId): url1 = "https://" url2 = "-api.private.fin.ag/v3/" url3 = "/insight/login/" url4 = "/attributes/" # Setting up the API call endpoint = "/GetAllAttributes" url = url1 + instance + url2 + customerId + url3 + LoginId + url4 + RequestId + endpoint # Making the first API call and measure its time start_time = time.time() print("Calling",endpoint) response = requests.get(url) end_time = time.time() duration_rounded = round(end_time - start_time, 2) elapsed_times.append([endpoint,duration_rounded]) if response.status_code == 200: # The request was successful response_data = response.json() print(f"Error: {response.status_code}") print(response.text) else: # The request was unsuccessful print(f"Error: {response.status_code}") print(response.text) # Print the error message or response content print(f"Request duration: {duration_rounded} seconds") return response_data ###3.VARIALBLES### ######################## VARIABLES ######################## instance = "toolbox" #<------- ADD YOUR INSTANCES NAME customerId = "43387ca6-0391-4c82-857d-70d95f087ecb" #<------- ADD YOUR CUSTOMER ID LoginId = "3b2ccc0e-937f-43f5-92b9-08dc83eb1e49" #<------- ADD THE LOGINID RequestId = "" DocumentUploadInstitution = "" elapsed_times = [] ###4.API CALLS AND SAVING FILES### ######################## Authentification API ######################## AuthorizeResponse = Authorize(instance, customerId, LoginId, elapsed_times) RequestId = AuthorizeResponse[0] DocumentUploadInstitution = AuthorizeResponse[1] print("") ######################## GAD+GADAsyn API ######################## GAD_payload = GAD(instance, customerId, LoginId, elapsed_times, RequestId) if GAD_payload [0] == 200: webhook_payload = GAD_payload [1] webhook_code = GAD_payload [0] print ("\t","GAD code", webhook_code) print ("\t","GAD result:", webhook_payload) print("") json_filename = 'GAD_'+LoginId+'.json' with open(json_filename, 'w') as json_file: json.dump(webhook_payload, json_file, indent=4) print(f"GADAsync_JSON data saved to {json_filename}") else: GADASync_payload = GADASync(instance, customerId, LoginId, elapsed_times, RequestId) webhook_payload = GADASync_payload [1] webhook_code = GADASync_payload [0] print ("\t","GADAsync code", webhook_code) print ("\t","GADAsync result:", webhook_payload) print("") json_filename = 'GADAsync_'+LoginId+'.json' with open(json_filename, 'w') as json_file: json.dump(webhook_payload, json_file, indent=4) print(f"GADAsync_JSON data saved to {json_filename}") print("") ######################## AllAttributes API ######################## GETAllAttributes = AllAtributes(instance, customerId, LoginId, elapsed_times, RequestId) print ("\t","GetAttributes result", GETAllAttributes) print("") # Write JSON data to a file json_filename = 'GetAllAttributes_'+LoginId+'.json' with open(json_filename, 'w') as json_file: json.dump(GETAllAttributes, json_file, indent=4) print(f"GetAllAttributes_JSON data saved to {json_filename}") print("")