Some checks failed
Deployment Verification / deploy-and-test (push) Failing after 29s
3994 lines
178 KiB
Python
3994 lines
178 KiB
Python
import os
|
|
import ast
|
|
import sys
|
|
import re
|
|
import copy
|
|
import time
|
|
import base64
|
|
import json
|
|
import random
|
|
import liquid
|
|
import logging
|
|
import urllib3
|
|
import hashlib
|
|
import zipfile
|
|
import asyncio
|
|
import requests
|
|
import http.client
|
|
import urllib.parse
|
|
import jinja2
|
|
import datetime
|
|
import dateutil
|
|
|
|
import threading
|
|
import concurrent.futures
|
|
|
|
from io import StringIO as StringBuffer, BytesIO
|
|
from liquid import Liquid, defaults
|
|
|
|
runtime = os.getenv("SHUFFLE_SWARM_CONFIG", "")
|
|
|
|
###
|
|
###
|
|
###
|
|
#### Filters for liquidpy
|
|
###
|
|
###
|
|
###
|
|
|
|
defaults.MODE = 'wild'
|
|
defaults.FROM_FILE = False
|
|
from liquid.filters.manager import FilterManager
|
|
from liquid.filters.standard import standard_filter_manager
|
|
|
|
shuffle_filters = FilterManager()
|
|
for key, value in standard_filter_manager.filters.items():
|
|
shuffle_filters.filters[key] = value
|
|
|
|
#@shuffle_filters.register
|
|
#def plus(a, b):
|
|
# try:
|
|
# a = int(a)
|
|
# except:
|
|
# a = 0
|
|
#
|
|
# try:
|
|
# b = int(b)
|
|
# except:
|
|
# b = 0
|
|
#
|
|
# return standard_filter_manager.filters["plus"](a, b)
|
|
#
|
|
#@shuffle_filters.register
|
|
#def minus(a, b):
|
|
# a = int(a)
|
|
# b = int(b)
|
|
# return standard_filter_manager.filters["minus"](a, b)
|
|
#
|
|
#@shuffle_filters.register
|
|
#def multiply(a, b):
|
|
# a = int(a)
|
|
# b = int(b)
|
|
# return standard_filter_manager.filters["multiply"](a, b)
|
|
#
|
|
#@shuffle_filters.register
|
|
#def divide(a, b):
|
|
# a = int(a)
|
|
# b = int(b)
|
|
# return standard_filter_manager.filters["divide"](a, b)
|
|
|
|
@shuffle_filters.register
|
|
def md5(a):
|
|
a = str(a)
|
|
return hashlib.md5(a.encode('utf-8')).hexdigest()
|
|
|
|
@shuffle_filters.register
|
|
def sha256(a):
|
|
a = str(a)
|
|
return hashlib.sha256(str(a).encode("utf-8")).hexdigest()
|
|
|
|
@shuffle_filters.register
|
|
def md5_base64(a):
|
|
a = str(a)
|
|
foundhash = hashlib.md5(a.encode('utf-8')).hexdigest()
|
|
return base64.b64encode(foundhash.encode('utf-8'))
|
|
|
|
@shuffle_filters.register
|
|
def base64_encode(a):
|
|
a = str(a)
|
|
try:
|
|
return base64.b64encode(a.encode('utf-8')).decode()
|
|
except:
|
|
return base64.b64encode(a).decode()
|
|
|
|
@shuffle_filters.register
|
|
def base64_decode(a):
|
|
a = str(a)
|
|
try:
|
|
return base64.b64decode(a).decode("unicode_escape")
|
|
except:
|
|
try:
|
|
return base64.b64decode(a).decode()
|
|
except:
|
|
return base64.b64decode(a)
|
|
|
|
@shuffle_filters.register
|
|
def json_parse(a):
|
|
return json.loads(str(a))
|
|
|
|
@shuffle_filters.register
|
|
def as_object(a):
|
|
return json.loads(str(a))
|
|
|
|
@shuffle_filters.register
|
|
def ast(a):
|
|
return ast.literal_eval(str(a))
|
|
|
|
@shuffle_filters.register
|
|
def escape_string(a):
|
|
a = str(a)
|
|
return a.replace("\\\'", "\'", -1).replace("\\\"", "\"", -1).replace("'", "\\\'", -1).replace("\"", "\\\"", -1)
|
|
|
|
@shuffle_filters.register
|
|
def json_escape(a):
|
|
a = str(a)
|
|
return a.replace("\\\'", "\'", -1).replace("\\\"", "\"", -1).replace("'", "\\\\\'", -1).replace("\"", "\\\\\"", -1)
|
|
|
|
@shuffle_filters.register
|
|
def escape_json(a):
|
|
a = str(a)
|
|
return a.replace("\\\'", "\'", -1).replace("\\\"", "\"", -1).replace("'", "\\\\\'", -1).replace("\"", "\\\\\"", -1)
|
|
|
|
# By default using json escape to add all backslashes
|
|
@shuffle_filters.register
|
|
def escape(a):
|
|
a = str(a)
|
|
return json_escape(a)
|
|
|
|
@shuffle_filters.register
|
|
def neat_json(a):
|
|
try:
|
|
a = json.loads(a)
|
|
except:
|
|
pass
|
|
|
|
return json.dumps(a, indent=4, sort_keys=True)
|
|
|
|
@shuffle_filters.register
|
|
def flatten(a):
|
|
a = list(a)
|
|
|
|
flat_list = [a for xs in a for a in xs]
|
|
return flat_list
|
|
|
|
@shuffle_filters.register
|
|
def last(a):
|
|
try:
|
|
a = json.loads(a)
|
|
except:
|
|
pass
|
|
|
|
if len(a) == 0:
|
|
return ""
|
|
|
|
return a[-1]
|
|
|
|
@shuffle_filters.register
|
|
def first(a):
|
|
try:
|
|
a = json.loads(a)
|
|
except:
|
|
pass
|
|
|
|
if len(a) == 0:
|
|
return ""
|
|
|
|
return a[0]
|
|
|
|
|
|
@shuffle_filters.register
|
|
def csv_parse(a):
|
|
a = str(a)
|
|
splitdata = a.split("\n")
|
|
columns = []
|
|
if len(splitdata) > 1:
|
|
columns = splitdata[0].split(",")
|
|
else:
|
|
return a.split("\n")
|
|
|
|
allitems = []
|
|
cnt = -1
|
|
for item in splitdata[1:]:
|
|
cnt += 1
|
|
commasplit = item.split(",")
|
|
|
|
fullitem = {}
|
|
fullitem["unparsed"] = item
|
|
fullitem["index"] = cnt
|
|
fullitem["parsed"] = {}
|
|
if len(columns) != len(commasplit):
|
|
|
|
if len(commasplit) > len(columns):
|
|
diff = len(commasplit)-len(columns)
|
|
|
|
try:
|
|
commasplit = commasplit[0:len(commasplit)-diff]
|
|
except:
|
|
pass
|
|
else:
|
|
for item in range(0, len(columns)-len(commasplit)):
|
|
commasplit.append("")
|
|
|
|
for key in range(len(columns)):
|
|
try:
|
|
fullitem["parsed"][columns[key]] = commasplit[key]
|
|
except:
|
|
continue
|
|
|
|
allitems.append(fullitem)
|
|
|
|
try:
|
|
return json.dumps(allitems)
|
|
except:
|
|
print("[ERROR] Failed dumping from JSON in csv parse")
|
|
return allitems
|
|
|
|
@shuffle_filters.register
|
|
def parse_csv(a):
|
|
return csv_parse(a)
|
|
|
|
@shuffle_filters.register
|
|
def format_csv(a):
|
|
return csv_parse(a)
|
|
|
|
@shuffle_filters.register
|
|
def csv_format(a):
|
|
return csv_parse(a)@standard_filter_manager.register
|
|
|
|
@shuffle_filters.register
|
|
def split(base, sep):
|
|
if not sep:
|
|
try:
|
|
return json.dumps(list(base))
|
|
except:
|
|
return list(base)
|
|
|
|
try:
|
|
return json.dumps(base.split(sep))
|
|
except:
|
|
return base.split(sep)
|
|
|
|
#print(shuffle_filters.filters)
|
|
#print(Liquid("{{ '10' | plus: 1}}", filters=shuffle_filters.filters).render())
|
|
#print(Liquid("{{ '10' | minus: 1}}", filters=shuffle_filters.filters).render())
|
|
#print(Liquid("{{ asd | size }}", filters=shuffle_filters.filters).render())
|
|
#print(Liquid("{{ 'asd' | md5 }}", filters=shuffle_filters.filters).render())
|
|
#print(Liquid("{{ 'asd' | sha256 }}", filters=shuffle_filters.filters).render())
|
|
#print(Liquid("{{ 'asd' | md5_base64 | base64_decode }}", filters=shuffle_filters.filters).render())
|
|
|
|
###
|
|
###
|
|
###
|
|
###
|
|
###
|
|
###
|
|
###
|
|
|
|
|
|
class AppBase:
|
|
__version__ = None
|
|
app_name = None
|
|
|
|
def __init__(self, redis=None, logger=None, console_logger=None):#, docker_client=None):
|
|
self.logger = logger if logger is not None else logging.getLogger("AppBaseLogger")
|
|
|
|
if not os.getenv("SHUFFLE_LOGS_DISABLED") == "true":
|
|
self.log_capture_string = StringBuffer()
|
|
ch = logging.StreamHandler(self.log_capture_string)
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
ch.setFormatter(formatter)
|
|
logger.addHandler(ch)
|
|
|
|
self.redis=redis
|
|
self.console_logger = logger if logger is not None else logging.getLogger("AppBaseLogger")
|
|
|
|
# apikey is for the user / org
|
|
# authorization is for the specific workflow
|
|
|
|
self.url = os.getenv("CALLBACK_URL", "https://shuffler.io")
|
|
self.base_url = os.getenv("BASE_URL", "https://shuffler.io")
|
|
self.action = os.getenv("ACTION", "")
|
|
self.original_action = os.getenv("ACTION", "")
|
|
self.authorization = os.getenv("AUTHORIZATION", "")
|
|
self.current_execution_id = os.getenv("EXECUTIONID", "")
|
|
self.full_execution = os.getenv("FULL_EXECUTION", "")
|
|
self.start_time = int(time.time())
|
|
self.result_wrapper_count = 0
|
|
|
|
self.action_result = {
|
|
"action": self.action,
|
|
"authorization": self.authorization,
|
|
"execution_id": self.current_execution_id,
|
|
"result": f"",
|
|
"started_at": self.start_time,
|
|
"status": "",
|
|
"completed_at": int(time.time()),
|
|
}
|
|
|
|
if isinstance(self.action, str):
|
|
try:
|
|
self.action = json.loads(self.action)
|
|
self.original_action = json.loads(self.action)
|
|
except Exception as e:
|
|
self.logger.info(f"[DEBUG] Failed parsing action as JSON (init): {e}. NOT important if running apps with webserver. This is NOT critical.")
|
|
|
|
#print(f"ACTION: {self.action}")
|
|
|
|
if len(self.base_url) == 0:
|
|
self.base_url = self.url
|
|
|
|
# Checks output for whether it should be automatically parsed or not
|
|
def run_magic_parser(self, input_data):
|
|
if not isinstance(input_data, str):
|
|
self.logger.info("[DEBUG] Not string. Returning from magic")
|
|
return input_data
|
|
|
|
# Don't touch existing JSON/lists
|
|
if (input_data.startswith("[") and input_data.endswith("]")) or (input_data.startswith("{") and input_data.endswith("}")):
|
|
self.logger.info("[DEBUG] Already JSON-like. Returning from magic")
|
|
return input_data
|
|
|
|
if len(input_data) < 3:
|
|
self.logger.info("[DEBUG] Too short input data")
|
|
return input_data
|
|
|
|
# Don't touch large data.
|
|
if len(input_data) > 100000:
|
|
self.logger.info("[DEBUG] Value too large. Returning from magic")
|
|
return input_data
|
|
|
|
if not "\n" in input_data and not "," in input_data:
|
|
self.logger.info("[DEBUG] No data to autoparse - requires newline or comma")
|
|
return input_data
|
|
|
|
new_input = input_data
|
|
try:
|
|
#new_input.strip()
|
|
new_input = input_data.split()
|
|
new_return = []
|
|
|
|
index = 0
|
|
for item in new_input:
|
|
splititem = ","
|
|
if ", " in item:
|
|
splititem = ", "
|
|
elif "," in item:
|
|
splititem = ","
|
|
else:
|
|
new_return.append(item)
|
|
|
|
index += 1
|
|
continue
|
|
|
|
#print("FIX ITEM %s" % item)
|
|
for subitem in item.split(splititem):
|
|
new_return.insert(index, subitem)
|
|
|
|
index += 1
|
|
|
|
# Prevent large data or infinite loops
|
|
if index > 10000:
|
|
self.logger.info(f"[DEBUG] Infinite loop. Returning default data.")
|
|
return input_data
|
|
|
|
fixed_return = []
|
|
for item in new_return:
|
|
if not item:
|
|
continue
|
|
|
|
if not isinstance(item, str):
|
|
fixed_return.append(item)
|
|
continue
|
|
|
|
if item.endswith(","):
|
|
item = item[0:-1]
|
|
|
|
fixed_return.append(item)
|
|
|
|
new_input = fixed_return
|
|
except Exception as e:
|
|
# Not used anymore
|
|
#self.logger.info(f"[ERROR] Failed to run magic parser (2): {e}")
|
|
return input_data
|
|
|
|
try:
|
|
new_input = input_data.split()
|
|
except Exception as e:
|
|
self.logger.info(f"[ERROR] Failed to run parser during split (1): {e}")
|
|
return input_data
|
|
|
|
# Won't ever touch this one?
|
|
if isinstance(new_input, list) or isinstance(new_input, object):
|
|
try:
|
|
return json.dumps(new_input)
|
|
except Exception as e:
|
|
self.logger.info(f"[ERROR] Failed to run magic parser (3): {e}")
|
|
|
|
return new_input
|
|
|
|
def prepare_response(self, request):
|
|
try:
|
|
parsedheaders = {}
|
|
for key, value in request.headers.items():
|
|
parsedheaders[key] = value
|
|
|
|
cookies = {}
|
|
if request.cookies:
|
|
for key, value in request.cookies.items():
|
|
cookies[key] = value
|
|
|
|
|
|
jsondata = request.text
|
|
try:
|
|
jsondata = json.loads(jsondata)
|
|
except:
|
|
pass
|
|
|
|
return json.dumps({
|
|
"success": True,
|
|
"status": request.status_code,
|
|
"url": request.url,
|
|
"headers": parsedheaders,
|
|
"body": jsondata,
|
|
"cookies":cookies,
|
|
})
|
|
except Exception as e:
|
|
print(f"[WARNING] Failed in request: {e}")
|
|
return request.text
|
|
|
|
# FIXME: Add more info like logs in here.
|
|
# Docker logs: https://forums.docker.com/t/docker-logs-inside-the-docker-container/68190/2
|
|
def send_result(self, action_result, headers, stream_path):
|
|
if action_result["status"] == "EXECUTING":
|
|
action_result["status"] = "FAILURE"
|
|
|
|
try:
|
|
#self.logger.info(f"[DEBUG] ACTION: {self.action}")
|
|
if self.action["run_magic_output"] == True:
|
|
self.logger.warning(f"[INFO] Action result ran with Magic parser output.")
|
|
action_result["result"] = self.run_magic_parser(action_result["result"])
|
|
else:
|
|
self.logger.warning(f"[WARNING] Magic output not defined.")
|
|
except KeyError as e:
|
|
#self.logger.warning(f"[DEBUG] Failed to run magic autoparser (send result) - keyerror: {e}")
|
|
pass
|
|
except Exception as e:
|
|
#self.logger.warning(f"[DEBUG] Failed to run magic autoparser (send result): {e}")
|
|
pass
|
|
|
|
# Try it with some magic
|
|
|
|
action_result["completed_at"] = int(time.time())
|
|
self.logger.info(f"""[DEBUG] Inside Send result with status {action_result["status"]}""")
|
|
#if isinstance(action_result,
|
|
|
|
# FIXME: Add cleanup of parameters to not send to frontend here
|
|
params = {}
|
|
|
|
# I wonder if this actually works
|
|
self.logger.info(f"[DEBUG] Before last stream result")
|
|
url = "%s%s" % (self.base_url, stream_path)
|
|
self.logger.info(f"[INFO] URL FOR RESULT (URL): {url}")
|
|
|
|
try:
|
|
log_contents = "disabled: add env SHUFFLE_LOGS_DISABLED=true to Orborus to re-enable logs for apps. Can not be enabled natively in Cloud except in Hybrid mode."
|
|
if not os.getenv("SHUFFLE_LOGS_DISABLED") == "true":
|
|
log_contents = self.log_capture_string.getvalue()
|
|
|
|
#print("RESULTS: %s" % log_contents)
|
|
self.logger.info(f"[WARNING] Got logs of length {len(log_contents)}")
|
|
if len(action_result["action"]["parameters"]) == 0:
|
|
action_result["action"]["parameters"] = []
|
|
|
|
param_found = False
|
|
for param in action_result["action"]["parameters"]:
|
|
if param["name"] == "shuffle_action_logs":
|
|
param_found = True
|
|
break
|
|
|
|
if not param_found:
|
|
action_result["action"]["parameters"].append({
|
|
"name": "shuffle_action_logs",
|
|
"value": log_contents,
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f"[WARNING] Failed adding parameter for logs: {e}")
|
|
|
|
try:
|
|
finished = False
|
|
for i in range (0, 10):
|
|
# Random sleeptime between 0 and 1 second, with 0.1 increments
|
|
sleeptime = float(random.randint(0, 10) / 10)
|
|
|
|
try:
|
|
ret = requests.post(url, headers=headers, json=action_result, timeout=10, verify=False)
|
|
|
|
self.logger.info(f"[DEBUG] Result: {ret.status_code} (break on 200 or 201)")
|
|
if ret.status_code == 200 or ret.status_code == 201:
|
|
finished = True
|
|
break
|
|
else:
|
|
self.logger.info(f"[ERROR] Bad resp {ret.status_code}: {ret.text}")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.info(f"[DEBUG] Request problem: {e}")
|
|
time.sleep(sleeptime)
|
|
|
|
#time.sleep(5)
|
|
continue
|
|
except TimeoutError as e:
|
|
self.logger.info(f"[DEBUG] Timeout or request: {e}")
|
|
time.sleep(sleeptime)
|
|
|
|
#time.sleep(5)
|
|
continue
|
|
except requests.exceptions.ConnectionError as e:
|
|
self.logger.info(f"[DEBUG] Connectionerror: {e}")
|
|
time.sleep(sleeptime)
|
|
|
|
#time.sleep(5)
|
|
continue
|
|
except http.client.RemoteDisconnected as e:
|
|
self.logger.info(f"[DEBUG] Remote: {e}")
|
|
time.sleep(sleeptime)
|
|
|
|
#time.sleep(5)
|
|
continue
|
|
except urllib3.exceptions.ProtocolError as e:
|
|
self.logger.info(f"[DEBUG] Protocol err: {e}")
|
|
time.sleep(0.1)
|
|
|
|
#time.sleep(5)
|
|
continue
|
|
|
|
#time.sleep(5)
|
|
|
|
if not finished:
|
|
# Not sure why this would work tho :)
|
|
action_result["status"] = "FAILURE"
|
|
action_result["result"] = json.dumps({"success": False, "reason": "POST error: Failed connecting to %s over 10 retries to the backend" % url})
|
|
self.logger.info(f"[ERROR] Before typeerror stream result - NOT finished after 10 requests")
|
|
|
|
#ret = requests.post("%s%s" % (self.base_url, stream_path), headers=headers, json=action_result, verify=False)
|
|
self.send_result(action_result, {"Content-Type": "application/json", "Authorization": "Bearer %s" % self.authorization}, "/api/v1/streams")
|
|
return
|
|
|
|
self.logger.info(f"""[DEBUG] Successful request result request: Status= {ret.status_code} & Response= {ret.text}. Action status: {action_result["status"]}""")
|
|
except requests.exceptions.ConnectionError as e:
|
|
self.logger.info(f"[DEBUG] Unexpected ConnectionError happened: {e}")
|
|
except TypeError as e:
|
|
action_result["status"] = "FAILURE"
|
|
action_result["result"] = json.dumps({"success": False, "reason": "Typeerror when sending to backend URL %s" % url})
|
|
|
|
self.logger.info(f"[DEBUG] Before typeerror stream result: {e}")
|
|
ret = requests.post("%s%s" % (self.base_url, stream_path), headers=headers, json=action_result, verify=False)
|
|
#self.logger.info(f"[DEBUG] Result: {ret.status_code}")
|
|
#if ret.status_code != 200:
|
|
# pr
|
|
|
|
self.logger.info(f"[DEBUG] TypeError request: Status= {ret.status_code} & Response= {ret.text}")
|
|
except http.client.RemoteDisconnected as e:
|
|
self.logger.info(f"[DEBUG] Expected Remotedisconnect happened: {e}")
|
|
except urllib3.exceptions.ProtocolError as e:
|
|
self.logger.info(f"[DEBUG] Expected ProtocolError happened: {e}")
|
|
|
|
|
|
# FIXME: Re-enable data flushing otherwise we'll overload it all
|
|
# Or nah?
|
|
if not os.getenv("SHUFFLE_LOGS_DISABLED") == "true":
|
|
try:
|
|
self.log_capture_string.flush()
|
|
#self.log_capture_string.close()
|
|
#pass
|
|
except Exception as e:
|
|
print(f"[WARNING] Failed to flush logs: {e}")
|
|
pass
|
|
|
|
#async def cartesian_product(self, L):
|
|
def cartesian_product(self, L):
|
|
if L:
|
|
#return {(a, ) + b for a in L[0] for b in await self.cartesian_product(L[1:])}
|
|
return {(a, ) + b for a in L[0] for b in self.cartesian_product(L[1:])}
|
|
else:
|
|
return {()}
|
|
|
|
# Handles unique fields by negoiating with the backend
|
|
def validate_unique_fields(self, params):
|
|
#self.logger.info("IN THE UNIQUE FIELDS PLACE!")
|
|
|
|
newlist = [params]
|
|
if isinstance(params, list):
|
|
#self.logger.info("ITS A LIST!")
|
|
newlist = params
|
|
|
|
#self.full_execution = os.getenv("FULL_EXECUTION", "")
|
|
#self.logger.info(len(params))
|
|
#self.logger.info(params.items())
|
|
#self.logger.info(list(params.items()))
|
|
#self.logger.info(f"PARAM: {params}")
|
|
#self.logger.info(f"NEWLIST: {newlist}")
|
|
|
|
# FIXME: Also handle MULTI PARAM
|
|
values = []
|
|
param_names = []
|
|
all_values = {}
|
|
index = 0
|
|
for outerparam in newlist:
|
|
|
|
#self.logger.info(f"INNERTYPE: {type(outerparam)}")
|
|
#self.logger.info(f"HANDLING PARAM {key}")
|
|
param_value = ""
|
|
for key, value in outerparam.items():
|
|
#self.logger.info("KEY: %s" % key)
|
|
#value = params[key]
|
|
for param in self.action["parameters"]:
|
|
try:
|
|
if param["name"] == key and param["unique_toggled"]:
|
|
self.logger.info(f"[DEBUG] FOUND: {key} with param {param}!")
|
|
if isinstance(value, dict) or isinstance(value, list):
|
|
try:
|
|
value = json.dumps(value)
|
|
except json.decoder.JSONDecodeError as e:
|
|
self.logger.info(f"[WARNING] Error in json decode for param {value}: {e}")
|
|
continue
|
|
elif isinstance(value, int) or isinstance(value, float):
|
|
value = str(value)
|
|
elif value == False:
|
|
value = "False"
|
|
elif value == True:
|
|
value = "True"
|
|
|
|
self.logger.info(f"[DEBUG] VALUE APPEND: {value}")
|
|
param_value += value
|
|
if param["name"] not in param_names:
|
|
param_names.append(param["name"])
|
|
|
|
except (KeyError, NameError) as e:
|
|
self.logger.info(f"""Key/NameError in param handler for {param["name"]}: {e}""")
|
|
|
|
self.logger.info(f"[DEBUG] OUTER VALUE: {param_value}")
|
|
if len(param_value) > 0:
|
|
md5 = hashlib.md5(param_value.encode('utf-8')).hexdigest()
|
|
values.append(md5)
|
|
all_values[md5] = {
|
|
"index": index,
|
|
}
|
|
|
|
index += 1
|
|
|
|
# When in here, it means it should be unique
|
|
# Should this be done by the backend? E.g. ask it if the value is valid?
|
|
# 1. Check if it's unique towards key:value store in org for action
|
|
# 2. Check if COMBINATION is unique towards key:value store of action for org
|
|
# 3. Have a workflow configuration for unique ID's in unison or per field? E.g. if toggled, then send a hash of all fields together alphabetically, but if not, send one field at a time
|
|
|
|
# org_id = full_execution["workflow"]["execution_org"]["id"]
|
|
|
|
# USE ARRAY?
|
|
|
|
new_params = []
|
|
if len(values) > 0:
|
|
org_id = self.full_execution["workflow"]["execution_org"]["id"]
|
|
data = {
|
|
"append": True,
|
|
"workflow_check": False,
|
|
"authorization": self.authorization,
|
|
"execution_ref": self.current_execution_id,
|
|
"org_id": org_id,
|
|
"values": [{
|
|
"app": self.action["app_name"],
|
|
"action": self.action["name"],
|
|
"parameternames": param_names,
|
|
"parametervalues": values,
|
|
}]
|
|
}
|
|
|
|
#self.logger.info(f"DATA: {data}")
|
|
# 1594869a676630b397bc34f7dc0951a3
|
|
|
|
#self.logger.info(f"VALUE URL: {url}")
|
|
#self.logger.info(f"RET: {ret.text}")
|
|
#self.logger.info(f"ID: {ret.status_code}")
|
|
url = f"{self.url}/api/v1/orgs/{org_id}/validate_app_values"
|
|
ret = requests.post(url, json=data, verify=False)
|
|
if ret.status_code == 200:
|
|
json_value = ret.json()
|
|
if len(json_value["found"]) > 0:
|
|
modifier = 0
|
|
for item in json_value["found"]:
|
|
self.logger.info(f"Should remove {item}")
|
|
|
|
try:
|
|
self.logger.info(f"FOUND: {all_values[item]}")
|
|
self.logger.info(f"SHOULD REMOVE INDEX: {all_values[item]['index']}")
|
|
|
|
try:
|
|
newlist.pop(all_values[item]["index"]-modifier)
|
|
modifier += 1
|
|
except IndexError as e:
|
|
self.logger.info(f"Error popping value from array: {e}")
|
|
except (NameError, KeyError) as e:
|
|
self.logger.info(f"Failed removal: {e}")
|
|
|
|
|
|
#return False
|
|
else:
|
|
self.logger.info("None of the items were found!")
|
|
return newlist
|
|
else:
|
|
self.logger.info(f"[WARNING] Failed checking values with status code {ret.status_code}!")
|
|
|
|
#return True
|
|
return newlist
|
|
|
|
# Returns a list of all the executions to be done in the inner loop
|
|
# FIXME: Doesn't take into account whether you actually WANT to loop or not
|
|
# Check if the last part of the value is #?
|
|
#async def get_param_multipliers(self, baseparams):
|
|
def get_param_multipliers(self, baseparams):
|
|
# Example:
|
|
# {'call': ['hello', 'hello4'], 'call2': ['hello2', 'hello3'], 'call3': '1'}
|
|
#
|
|
# Should become this because of pairs (all same-length arrays, PROBABLY indicates same source node's values.
|
|
# [
|
|
# {'call': 'hello', 'call2': 'hello2', 'call3': '1'},
|
|
# {'call': 'hello4', 'call2': 'hello3', 'call3': '1'}
|
|
# ]
|
|
#
|
|
# ----------------------------------------------------------------------
|
|
# Example2:
|
|
# {'call': ['hello'], 'call2': ['hello2', 'hello3'], 'call3': '1'}
|
|
#
|
|
# Should become this because NOT pairs/triplets:
|
|
# [
|
|
# {'call': 'hello', 'call2': 'hello2', 'call3': '1'},
|
|
# {'call': 'hello', 'call2': 'hello3', 'call3': '1'}
|
|
# ]
|
|
#
|
|
# ----------------------------------------------------------------------
|
|
# Example3:
|
|
# {'call': ['hello', 'hello2'], 'call2': ['hello3', 'hello4', 'hello5'], 'call3': '1'}
|
|
#
|
|
# Should become this because arrays are not same length, aka no pairs/triplets. This is the multiplier effect. 2x3 arrays = 6 iterations
|
|
# [
|
|
# {'call': 'hello', 'call2': 'hello3', 'call3': '1'},
|
|
# {'call': 'hello', 'call2': 'hello4', 'call3': '1'},
|
|
# {'call': 'hello', 'call2': 'hello5', 'call3': '1'},
|
|
# {'call': 'hello2', 'call2': 'hello3', 'call3': '1'},
|
|
# {'call': 'hello2', 'call2': 'hello4', 'call3': '1'},
|
|
# {'call': 'hello2', 'call2': 'hello5', 'call3': '1'}
|
|
# ]
|
|
# To achieve this, we'll do this:
|
|
# 1. For the first array, take the total amount(y) (2x3=6) and divide it by the current array (x): 2. x/y = 3. This means do 3 of each value
|
|
# 2. For the second array, take the total amount(y) (2x3=6) and divide it by the current array (x): 3. x/y = 2.
|
|
# 3. What does the 3rd array do? Same, but ehhh?
|
|
#
|
|
# Example4:
|
|
# What if there are multiple loops inside a single item?
|
|
#
|
|
#
|
|
|
|
paramlist = []
|
|
listitems = []
|
|
listlengths = []
|
|
all_lists = []
|
|
all_list_keys = []
|
|
|
|
#check_value = "$Filter_list_testing.wrapper.#.tmp"
|
|
#self.action = action
|
|
|
|
loopnames = []
|
|
#self.logger.info(f"Baseparams to check!!: {baseparams}")
|
|
for key, value in baseparams.items():
|
|
check_value = ""
|
|
for param in self.original_action["parameters"]:
|
|
if param["name"] == key:
|
|
#self.logger.info("PARAM: %s" % param)
|
|
check_value = param["value"]
|
|
# self.result_wrapper_count = 0
|
|
|
|
octothorpe_count = param["value"].count(".#")
|
|
if octothorpe_count > self.result_wrapper_count:
|
|
self.result_wrapper_count = octothorpe_count
|
|
self.logger.info("[INFO] NEW OCTOTHORPE WRAPPER: %d" % octothorpe_count)
|
|
|
|
|
|
# This whole thing is hard.
|
|
# item = [{"data": "1.2.3.4", "dataType": "ip"}]
|
|
# $item = DONT loop items.
|
|
# $item.# = Loop items
|
|
# $item.#.data = Loop items
|
|
# With a single item, this is fine.
|
|
|
|
# item = [{"list": [{"data": "1.2.3.4", "dataType": "ip"}]}]
|
|
# $item = DONT loop items
|
|
# $item.# = Loop items
|
|
# $item.#.list = DONT loop items
|
|
# $item.#.list.# = Loop items
|
|
# $item.#.list.#.data = Loop items
|
|
# If the item itself is a list.. hmm
|
|
|
|
# FIXME: Check the above, and fix so that nested looped items can be
|
|
# Skipped if wanted
|
|
|
|
#self.logger.info("\nCHECK: %s" % check_value)
|
|
#try:
|
|
# values = parameter["value_replace"]
|
|
# if values != None:
|
|
# self.logger.info(values)
|
|
# for val in values:
|
|
# self.logger.info(val)
|
|
#except:
|
|
# pass
|
|
|
|
should_merge = False
|
|
if "#" in check_value:
|
|
should_merge = True
|
|
|
|
# Specific for OpenAPI body replacement
|
|
#self.logger.info("\n\n\nDOING STUFF BELOW HERE")
|
|
if not should_merge:
|
|
for parameter in self.original_action["parameters"]:
|
|
if parameter["name"] == key:
|
|
#self.logger.info("CHECKING BODY FOR VALUE REPLACE DATA!")
|
|
try:
|
|
values = parameter["value_replace"]
|
|
if values != None:
|
|
self.logger.info(values)
|
|
for val in values:
|
|
if "#" in val["value"]:
|
|
should_merge = True
|
|
break
|
|
except:
|
|
pass
|
|
|
|
#self.logger.info(f"MERGE: {should_merge}")
|
|
if isinstance(value, list):
|
|
self.logger.info(f"[DEBUG] Item {value} is a list.")
|
|
if len(value) <= 1:
|
|
if len(value) == 1:
|
|
baseparams[key] = value[0]
|
|
|
|
#if "#" in check_value:
|
|
# should_merge = True
|
|
else:
|
|
if not should_merge:
|
|
self.logger.info("[DEBUG] Adding WITHOUT looping list")
|
|
else:
|
|
if len(value) not in listlengths:
|
|
listlengths.append(len(value))
|
|
|
|
listitems.append(
|
|
{
|
|
key: len(value)
|
|
}
|
|
)
|
|
|
|
all_list_keys.append(key)
|
|
all_lists.append(baseparams[key])
|
|
else:
|
|
#self.logger.info(f"{value} is not a list")
|
|
pass
|
|
|
|
self.logger.info("[DEBUG] Listlengths: %s" % listlengths)
|
|
if len(listlengths) == 0:
|
|
self.logger.info("[DEBUG] NO multiplier. Running a single iteration.")
|
|
paramlist.append(baseparams)
|
|
elif len(listlengths) == 1:
|
|
self.logger.info("[DEBUG] NO MULTIPLIER NECESSARY. Length is %d" % len(listitems))
|
|
|
|
for item in listitems:
|
|
# This loops should always be length 1
|
|
for key, value in item.items():
|
|
if isinstance(value, int):
|
|
self.logger.info("\n[DEBUG] Should run key %s %d times from %s" % (key, value, baseparams[key]))
|
|
if len(paramlist) == value:
|
|
self.logger.info("[DEBUG] List ALREADY exists - just changing values")
|
|
for subloop in range(value):
|
|
baseitem = copy.deepcopy(baseparams)
|
|
paramlist[subloop][key] = baseparams[key][subloop]
|
|
else:
|
|
self.logger.info("[DEBUG] List DOESNT exist - ADDING values")
|
|
for subloop in range(value):
|
|
baseitem = copy.deepcopy(baseparams)
|
|
baseitem[key] = baseparams[key][subloop]
|
|
paramlist.append(baseitem)
|
|
|
|
else:
|
|
self.logger.info("[DEBUG] Multipliers to handle: %s" % listitems)
|
|
newlength = 1
|
|
for item in listitems:
|
|
for key, value in item.items():
|
|
newlength = newlength * value
|
|
|
|
self.logger.info("[DEBUG] Newlength of array: %d. Lists: %s" % (newlength, all_lists))
|
|
# Get the cartesian product of the arrays
|
|
#cartesian = await self.cartesian_product(all_lists)
|
|
cartesian = self.cartesian_product(all_lists)
|
|
newlist = []
|
|
for item in cartesian:
|
|
newlist.append(list(item))
|
|
|
|
newobject = {}
|
|
for subitem in range(len(newlist)):
|
|
baseitem = copy.deepcopy(baseparams)
|
|
for key in range(len(newlist[subitem])):
|
|
baseitem[all_list_keys[key]] = newlist[subitem][key]
|
|
|
|
paramlist.append(baseitem)
|
|
|
|
#self.logger.info("PARAMLIST: %s" % paramlist)
|
|
|
|
#newlist[subitem[0]]
|
|
#if len(newlist) > 0:
|
|
# itemlength = len(newlist[0])
|
|
|
|
# How do we get it back, ordered?
|
|
#for item in cartesian:
|
|
#self.logger.info("Listlengths: %s" % listlengths)
|
|
#paramlist = [baseparams]
|
|
|
|
#self.logger.info("[INFO] Return paramlist: %s" % paramlist)
|
|
return paramlist
|
|
|
|
|
|
# Runs recursed versions with inner loops and such
|
|
#async def run_recursed_items(self, func, baseparams, loop_wrapper):
|
|
def run_recursed_items(self, func, baseparams, loop_wrapper):
|
|
#self.logger.info(f"RECURSED ITEMS: {baseparams}")
|
|
has_loop = False
|
|
|
|
newparams = {}
|
|
for key, value in baseparams.items():
|
|
if isinstance(value, list) and len(value) > 0:
|
|
self.logger.info(f"[DEBUG] In list check for {key}")
|
|
|
|
try:
|
|
# Added skip for body (OpenAPI) which uses data= in requests
|
|
# Can be screwed up if they name theirs body too
|
|
if key != "body":
|
|
value[0] = json.loads(value[0])
|
|
except json.decoder.JSONDecodeError as e:
|
|
self.logger.info("[WARNING] JSON casting error (1): %s" % e)
|
|
except TypeError as e:
|
|
self.logger.info("[WARNING] TypeError: %s" % e)
|
|
|
|
self.logger.info("[DEBUG] POST initial list check")
|
|
|
|
try:
|
|
if isinstance(value, list) and len(value) == 1 and isinstance(value[0], list):
|
|
try:
|
|
loop_wrapper[key] += 1
|
|
except Exception as e:
|
|
self.logger.info("[WARNING] Exception in loop wrapper: {e}")
|
|
loop_wrapper[key] = 1
|
|
|
|
self.logger.info(f"[DEBUG] Key {key} is a list: {value}")
|
|
newparams[key] = value[0]
|
|
has_loop = True
|
|
else:
|
|
#self.logger.info(f"Key {key} is NOT a list within a list. Value: {value}")
|
|
newparams[key] = value
|
|
except Exception as e:
|
|
self.logger.info(f"[WARNING] Error in baseparams list: {e}")
|
|
newparams[key] = value
|
|
|
|
results = []
|
|
if has_loop:
|
|
#self.logger.info(f"[DEBUG] Should run inner loop: {newparams}")
|
|
self.logger.info(f"[DEBUG] Should run inner loop")
|
|
#ret = await self.run_recursed_items(func, newparams, loop_wrapper)
|
|
ret = self.run_recursed_items(func, newparams, loop_wrapper)
|
|
else:
|
|
#self.logger.info(f"[DEBUG] Should run multiplier check with params (inner): {newparams}")
|
|
self.logger.info(f"[DEBUG] Should run multiplier check with params (inner)")
|
|
# 1. Find the loops that are required and create new multipliers
|
|
# If here: check for multipliers within this scope.
|
|
ret = []
|
|
#param_multiplier = await self.get_param_multipliers(newparams)
|
|
param_multiplier = self.get_param_multipliers(newparams)
|
|
|
|
# FIXME: This does a deduplication of the data
|
|
new_params = self.validate_unique_fields(param_multiplier)
|
|
#self.logger.info(f"NEW PARAMS: {new_params}")
|
|
if len(new_params) == 0:
|
|
self.logger.info("[WARNING] SHOULD STOP MULTI-EXECUTION BECAUSE FIELDS AREN'T UNIQUE")
|
|
self.action_result = {
|
|
"action": self.action,
|
|
"authorization": self.authorization,
|
|
"execution_id": self.current_execution_id,
|
|
"result": f"All {len(param_multiplier)} values were non-unique",
|
|
"started_at": self.start_time,
|
|
"status": "SKIPPED",
|
|
"completed_at": int(time.time()),
|
|
}
|
|
|
|
self.send_result(self.action_result, {"Content-Type": "application/json", "Authorization": "Bearer %s" % self.authorization}, "/api/v1/streams")
|
|
if runtime != "run":
|
|
exit()
|
|
else:
|
|
return
|
|
else:
|
|
#subparams = new_params
|
|
#self.logger.info(f"NEW PARAMS: {new_params}")
|
|
param_multiplier = new_params
|
|
|
|
#self.logger.info("Returned with newparams of length %d", len(new_params))
|
|
#if isinstance(new_params, list) and len(new_params) == 1:
|
|
# params = new_params[0]
|
|
#else:
|
|
# self.logger.info("[WARNING] SHOULD STOP EXECUTION BECAUSE FIELDS AREN'T UNIQUE")
|
|
# action_result["status"] = "SKIPPED"
|
|
# action_result["result"] = f"A non-unique value was found"
|
|
# action_result["completed_at"] = int(time.time())
|
|
# self.send_result(action_result, headers, stream_path)
|
|
# return
|
|
|
|
self.logger.info("[INFO] Multiplier length: %d" % len(param_multiplier))
|
|
#tmp = ""
|
|
for subparams in param_multiplier:
|
|
#self.logger.info(f"SUBPARAMS IN MULTI: {subparams}")
|
|
try:
|
|
|
|
while True:
|
|
try:
|
|
tmp = func(**subparams)
|
|
break
|
|
except TypeError as e:
|
|
self.logger.info("BASE TYPEERROR: %s" % e)
|
|
errorstring = "%s" % e
|
|
if "got an unexpected keyword argument" in errorstring:
|
|
fieldsplit = errorstring.split("'")
|
|
if len(fieldsplit) > 1:
|
|
field = fieldsplit[1]
|
|
|
|
try:
|
|
del subparams[field]
|
|
self.logger.info("Removed invalid field %s (1)" % field)
|
|
except KeyError:
|
|
break
|
|
else:
|
|
raise Exception(json.dumps({
|
|
"success": False,
|
|
"reason": "You may be running an old version of this action. Please delete and remake the node.",
|
|
"exception": f"TypeError: {e}",
|
|
}))
|
|
break
|
|
|
|
|
|
except:
|
|
e = ""
|
|
try:
|
|
e = sys.exc_info()[1]
|
|
except:
|
|
self.logger.info("Exec check fail: %s" % e)
|
|
pass
|
|
|
|
tmp = json.dumps({
|
|
"success": False,
|
|
"reason": f"An error occured during execution: {e}",
|
|
})
|
|
|
|
|
|
# An attempt at decomposing coroutine results
|
|
# Backwards compatibility
|
|
try:
|
|
if asyncio.iscoroutine(tmp):
|
|
self.logger.info("[DEBUG] In coroutine (2)")
|
|
async def parse_value(tmp):
|
|
value = await asyncio.gather(
|
|
tmp
|
|
)
|
|
|
|
return value[0]
|
|
|
|
|
|
tmp = asyncio.run(parse_value(tmp))
|
|
else:
|
|
#self.logger.info("[DEBUG] Not in coroutine (2)")
|
|
pass
|
|
except Exception as e:
|
|
self.logger.warning("[ERROR] Failed to parse coroutine value for old app: {e}")
|
|
|
|
#self.logger.info("RET from execution: %s" % ret)
|
|
new_value = tmp
|
|
if tmp == None:
|
|
new_value = ""
|
|
elif isinstance(tmp, dict):
|
|
new_value = json.dumps(tmp)
|
|
elif isinstance(tmp, list):
|
|
new_value = json.dumps(tmp)
|
|
#else:
|
|
#tmp = tmp.replace("\"", "\\\"", -1)
|
|
|
|
try:
|
|
new_value = json.loads(new_value)
|
|
except json.decoder.JSONDecodeError as e:
|
|
pass
|
|
except TypeError as e:
|
|
pass
|
|
except:
|
|
pass
|
|
#self.logger.info("Json: %s" % e)
|
|
#ret.append(tmp)
|
|
|
|
#if self.result_wrapper_count > 0:
|
|
# ret.append("["*(self.result_wrapper_count-1)+new_value+"]"*(self.result_wrapper_count-1))
|
|
#else:
|
|
ret.append(new_value)
|
|
|
|
self.logger.info("[INFO] Ret length: %d" % len(ret))
|
|
if len(ret) == 1:
|
|
#ret = ret[0]
|
|
self.logger.info("[DEBUG] DONT make list of 1 into 0!!")
|
|
|
|
self.logger.info("Return from execution: %s" % ret)
|
|
if ret == None:
|
|
results.append("")
|
|
json_object = False
|
|
elif isinstance(ret, dict):
|
|
results.append(ret)
|
|
json_object = True
|
|
elif isinstance(ret, list):
|
|
results = ret
|
|
json_object = True
|
|
else:
|
|
ret = ret.replace("\"", "\\\"", -1)
|
|
|
|
try:
|
|
results.append(json.loads(ret))
|
|
json_object = True
|
|
except json.decoder.JSONDecodeError as e:
|
|
#self.logger.info("Json: %s" % e)
|
|
results.append(ret)
|
|
except TypeError as e:
|
|
results.append(ret)
|
|
except:
|
|
results.append(ret)
|
|
|
|
if len(results) == 1:
|
|
#results = results[0]
|
|
self.logger.info("DONT MAKE LIST FROM 1 TO 0!!")
|
|
|
|
self.logger.info("\nLOOP: %s\nRESULTS: %s" % (loop_wrapper, results))
|
|
return results
|
|
|
|
# Downloads all files from a namespace
|
|
# Currently only working on local version of Shuffle
|
|
def get_file_category_ids(self, category):
|
|
org_id = self.full_execution["workflow"]["execution_org"]["id"]
|
|
|
|
get_path = "/api/v1/files/namespaces/%s?execution_id=%s&ids=true" % (category, self.full_execution["execution_id"])
|
|
headers = {
|
|
"Authorization": "Bearer %s" % self.authorization,
|
|
"User-Agent": "Shuffle 1.1.0",
|
|
}
|
|
|
|
ret = requests.get("%s%s" % (self.url, get_path), headers=headers, verify=False)
|
|
return ret.json()
|
|
#if ret1.status_code != 200:
|
|
# return {
|
|
# "success": False,
|
|
# "reason": "Status code is %d from backend for category %s" % category,
|
|
# "list": [],
|
|
# }
|
|
|
|
#return {
|
|
# "success": True,
|
|
# "ids": ret1.json(),
|
|
#}
|
|
|
|
|
|
# Downloads all files from a namespace
|
|
# Currently only working on local version of Shuffle
|
|
def get_file_namespace(self, namespace):
|
|
org_id = self.full_execution["workflow"]["execution_org"]["id"]
|
|
|
|
get_path = "/api/v1/files/namespaces/%s?execution_id=%s" % (namespace, self.full_execution["execution_id"])
|
|
headers = {
|
|
"Authorization": "Bearer %s" % self.authorization,
|
|
"User-Agent": "Shuffle 1.1.0",
|
|
}
|
|
|
|
ret1 = requests.get("%s%s" % (self.url, get_path), headers=headers, verify=False)
|
|
if ret1.status_code != 200:
|
|
return None
|
|
|
|
filebytes = BytesIO(ret1.content)
|
|
myzipfile = zipfile.ZipFile(filebytes)
|
|
|
|
# Unzip and build here!
|
|
#for member in files.namelist():
|
|
# filename = os.path.basename(member)
|
|
# if not filename:
|
|
# continue
|
|
|
|
# self.logger.info("File: %s" % member)
|
|
# source = files.open(member)
|
|
# with open("%s/%s" % (basedir, source.name), "wb+") as tmp:
|
|
# filedata = source.read()
|
|
# self.logger.info("Filedata (%s): %s" % (source.name, filedata))
|
|
# tmp.write(filedata)
|
|
|
|
return myzipfile
|
|
|
|
def get_file_namespace_ids(self, namespace):
|
|
return self.get_file_category_ids(self, namespace)
|
|
|
|
def get_file_category(self, category):
|
|
return self.get_file_namespace(self, category)
|
|
|
|
# Things to consider for files:
|
|
# - How can you download / stream a file?
|
|
# - Can you decide if you want a stream or the files directly?
|
|
def get_file(self, value):
|
|
full_execution = self.full_execution
|
|
org_id = full_execution["workflow"]["execution_org"]["id"]
|
|
|
|
self.logger.info("SHOULD GET FILES BASED ON ORG %s, workflow %s and value(s) %s" % (org_id, full_execution["workflow"]["id"], value))
|
|
|
|
if isinstance(value, list):
|
|
self.logger.info("IS LIST!")
|
|
#if len(value) == 1:
|
|
# value = value[0]
|
|
else:
|
|
value = [value]
|
|
|
|
returns = []
|
|
for item in value:
|
|
self.logger.info("VALUE: %s" % item)
|
|
if len(item) != 36 and not item.startswith("file_"):
|
|
self.logger.info("Bad length for file value %s" % item)
|
|
continue
|
|
#return {
|
|
# "filename": "",
|
|
# "data": "",
|
|
# "success": False,
|
|
#}
|
|
|
|
get_path = "/api/v1/files/%s?execution_id=%s" % (item, full_execution["execution_id"])
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Authorization": "Bearer %s" % self.authorization,
|
|
"User-Agent": "Shuffle 1.1.0",
|
|
}
|
|
|
|
ret1 = requests.get("%s%s" % (self.url, get_path), headers=headers, verify=False)
|
|
self.logger.info("RET1 (file get): %s" % ret1.text)
|
|
if ret1.status_code != 200:
|
|
returns.append({
|
|
"filename": "",
|
|
"data": "",
|
|
"success": False,
|
|
})
|
|
continue
|
|
|
|
content_path = "/api/v1/files/%s/content?execution_id=%s" % (item, full_execution["execution_id"])
|
|
ret2 = requests.get("%s%s" % (self.url, content_path), headers=headers, verify=False)
|
|
self.logger.info("RET2 (file get) done")
|
|
if ret2.status_code == 200:
|
|
tmpdata = ret1.json()
|
|
returndata = {
|
|
"success": True,
|
|
"filename": tmpdata["filename"],
|
|
"data": ret2.content,
|
|
}
|
|
returns.append(returndata)
|
|
|
|
self.logger.info("RET3 (file get done)")
|
|
|
|
if len(returns) == 0:
|
|
return {
|
|
"success": False,
|
|
"filename": "",
|
|
"data": b"",
|
|
}
|
|
elif len(returns) == 1:
|
|
return returns[0]
|
|
else:
|
|
return returns
|
|
|
|
def delete_cache(self, key):
|
|
org_id = self.full_execution["workflow"]["execution_org"]["id"]
|
|
url = "%s/api/v1/orgs/%s/delete_cache" % (self.url, org_id)
|
|
|
|
data = {
|
|
"workflow_id": self.full_execution["workflow"]["id"],
|
|
"execution_id": self.current_execution_id,
|
|
"authorization": self.authorization,
|
|
"org_id": org_id,
|
|
"key": key,
|
|
}
|
|
|
|
response = requests.post(url, json=data, verify=False)
|
|
try:
|
|
allvalues = response.json()
|
|
return json.dumps(allvalues)
|
|
except Exception as e:
|
|
self.logger.info("[ERROR} Failed to parse response from delete_cache: %s" % e)
|
|
#return response.json()
|
|
return json.dumps({"success": False, "reason": f"Failed to delete cache for key '{key}'"})
|
|
|
|
def set_cache(self, key, value):
|
|
org_id = self.full_execution["workflow"]["execution_org"]["id"]
|
|
url = "%s/api/v1/orgs/%s/set_cache" % (self.url, org_id)
|
|
data = {
|
|
"workflow_id": self.full_execution["workflow"]["id"],
|
|
"execution_id": self.current_execution_id,
|
|
"authorization": self.authorization,
|
|
"org_id": org_id,
|
|
"key": key,
|
|
"value": str(value),
|
|
}
|
|
|
|
response = requests.post(url, json=data, verify=False)
|
|
try:
|
|
allvalues = response.json()
|
|
allvalues["key"] = key
|
|
allvalues["value"] = str(value)
|
|
return allvalues
|
|
except Exception as e:
|
|
self.logger.info("[ERROR} Failed to parse response from set cache: %s" % e)
|
|
#return response.json()
|
|
return {"success": False}
|
|
|
|
def get_cache(self, key):
|
|
org_id = self.full_execution["workflow"]["execution_org"]["id"]
|
|
url = "%s/api/v1/orgs/%s/get_cache" % (self.url, org_id)
|
|
data = {
|
|
"workflow_id": self.full_execution["workflow"]["id"],
|
|
"execution_id": self.current_execution_id,
|
|
"authorization": self.authorization,
|
|
"org_id": org_id,
|
|
"key": key,
|
|
}
|
|
|
|
value = requests.post(url, json=data, verify=False)
|
|
try:
|
|
allvalues = value.json()
|
|
self.logger.info("VAL1: ", allvalues)
|
|
allvalues["key"] = key
|
|
self.logger.info("VAL2: ", allvalues)
|
|
|
|
try:
|
|
parsedvalue = json.loads(allvalues["value"])
|
|
allvalues["value"] = parsedvalue
|
|
except:
|
|
self.logger.info("Parsing of value as JSON failed. Continue anyway!")
|
|
|
|
return allvalues
|
|
except:
|
|
self.logger.info("Value couldn't be parsed, or json dump of value failed")
|
|
#return value.json()
|
|
return {"success": False}
|
|
|
|
# Wrapper for set_files
|
|
def set_file(self, infiles):
|
|
return self.set_files(infiles)
|
|
|
|
# Sets files in the backend
|
|
def set_files(self, infiles):
|
|
full_execution = self.full_execution
|
|
workflow_id = full_execution["workflow"]["id"]
|
|
org_id = full_execution["workflow"]["execution_org"]["id"]
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Authorization": "Bearer %s" % self.authorization,
|
|
"User-Agent": "Shuffle 1.1.0",
|
|
}
|
|
|
|
if not isinstance(infiles, list):
|
|
infiles = [infiles]
|
|
|
|
create_path = "/api/v1/files/create?execution_id=%s" % full_execution["execution_id"]
|
|
file_ids = []
|
|
for curfile in infiles:
|
|
filename = "unspecified"
|
|
data = {
|
|
"filename": filename,
|
|
"workflow_id": workflow_id,
|
|
"org_id": org_id,
|
|
}
|
|
|
|
try:
|
|
data["filename"] = curfile["filename"]
|
|
filename = curfile["filename"]
|
|
except KeyError as e:
|
|
self.logger.info(f"KeyError in file setup: {e}")
|
|
pass
|
|
|
|
ret = requests.post("%s%s" % (self.url, create_path), headers=headers, json=data, verify=False)
|
|
#self.logger.info(f"Ret CREATE: {ret.text}")
|
|
cur_id = ""
|
|
if ret.status_code == 200:
|
|
#self.logger.info("RET: %s" % ret.text)
|
|
ret_json = ret.json()
|
|
if not ret_json["success"]:
|
|
self.logger.info("Not success in file upload creation.")
|
|
continue
|
|
|
|
self.logger.info("Should handle ID %s" % ret_json["id"])
|
|
file_ids.append(ret_json["id"])
|
|
cur_id = ret_json["id"]
|
|
else:
|
|
self.logger.info("Bad status code: %d" % ret.status_code)
|
|
continue
|
|
|
|
if len(cur_id) == 0:
|
|
self.logger.info("No file ID specified from backend")
|
|
continue
|
|
|
|
new_headers = {
|
|
"Authorization": f"Bearer {self.authorization}",
|
|
"User-Agent": "Shuffle 1.1.0",
|
|
}
|
|
|
|
upload_path = "/api/v1/files/%s/upload?execution_id=%s" % (cur_id, full_execution["execution_id"])
|
|
self.logger.info("Create path: %s" % create_path)
|
|
|
|
files={"shuffle_file": (filename, curfile["data"])}
|
|
#open(filename,'rb')}
|
|
|
|
ret = requests.post("%s%s" % (self.url, upload_path), files=files, headers=new_headers, verify=False)
|
|
self.logger.info("Ret UPLOAD: %s" % ret.text)
|
|
self.logger.info("Ret2 UPLOAD: %d" % ret.status_code)
|
|
|
|
self.logger.info("IDS TO RETURN: %s" % file_ids)
|
|
return file_ids
|
|
|
|
#async def execute_action(self, action):
|
|
def execute_action(self, action):
|
|
# !!! Let this line stay - its used for some horrible codegeneration / stitching !!! #
|
|
#STARTCOPY
|
|
stream_path = "/api/v1/streams"
|
|
self.action_result = {
|
|
"action": action,
|
|
"authorization": self.authorization,
|
|
"execution_id": self.current_execution_id,
|
|
"result": "",
|
|
"started_at": int(time.time()),
|
|
"status": "EXECUTING"
|
|
}
|
|
|
|
# Simple validation of parameters in general
|
|
replace_params = False
|
|
try:
|
|
tmp_parameters = action["parameters"]
|
|
for param in tmp_parameters:
|
|
if param["value"] == "SHUFFLE_AUTO_REMOVED":
|
|
replace_params = True
|
|
except KeyError:
|
|
action["parameters"] = []
|
|
except TypeError:
|
|
pass
|
|
|
|
self.action = copy.deepcopy(action)
|
|
self.logger.info(f"[DEBUG] Sending starting action result (EXECUTING). Param replace: {replace_params}")
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {self.authorization}",
|
|
"User-Agent": "Shuffle 1.1.0",
|
|
}
|
|
|
|
if len(self.action) == 0:
|
|
self.logger.info("[WARNING] ACTION env not defined")
|
|
self.action_result["result"] = "Error in setup ENV: ACTION not defined"
|
|
self.send_result(self.action_result, headers, stream_path)
|
|
return
|
|
|
|
if len(self.authorization) == 0:
|
|
self.logger.info("[WARING] AUTHORIZATION env not defined")
|
|
self.action_result["result"] = "Error in setup ENV: AUTHORIZATION not defined"
|
|
self.send_result(self.action_result, headers, stream_path)
|
|
return
|
|
|
|
if len(self.current_execution_id) == 0:
|
|
self.logger.info("[WARNING] EXECUTIONID env not defined")
|
|
self.action_result["result"] = "Error in setup ENV: EXECUTIONID not defined"
|
|
self.send_result(self.action_result, headers, stream_path)
|
|
return
|
|
|
|
|
|
# Add async logger
|
|
# self.console_logger.handlers[0].stream.set_execution_id()
|
|
#self.logger.info("Before initial stream result")
|
|
|
|
# FIXME: Shouldn't skip this, but it's good for minimzing API calls
|
|
#try:
|
|
# ret = requests.post("%s%s" % (self.base_url, stream_path), headers=headers, json=action_result, verify=False)
|
|
# self.logger.info("Workflow: %d" % ret.status_code)
|
|
# if ret.status_code != 200:
|
|
# self.logger.info(ret.text)
|
|
#except requests.exceptions.ConnectionError as e:
|
|
# self.logger.info("Connectionerror: %s" % e)
|
|
|
|
# action_result["result"] = "Bad setup during startup: %s" % e
|
|
# self.send_result(action_result, headers, stream_path)
|
|
# return
|
|
|
|
# Verify whether there are any parameters with ACTION_RESULT required
|
|
# If found, we get the full results list from backend
|
|
fullexecution = {}
|
|
if isinstance(self.full_execution, str) and len(self.full_execution) == 0:
|
|
#self.logger.info("[DEBUG] NO EXECUTION - LOADING!")
|
|
try:
|
|
failed = False
|
|
rettext = ""
|
|
for i in range(0, 5):
|
|
tmpdata = {
|
|
"authorization": self.authorization,
|
|
"execution_id": self.current_execution_id
|
|
}
|
|
|
|
self.logger.info("[ERROR] Before FULLEXEC stream result")
|
|
ret = requests.post(
|
|
"%s/api/v1/streams/results" % (self.base_url),
|
|
headers=headers,
|
|
json=tmpdata,
|
|
verify=False
|
|
)
|
|
|
|
if ret.status_code == 200:
|
|
fullexecution = ret.json()
|
|
failed = False
|
|
break
|
|
|
|
#elif ret.status_code == 500 or ret.status_code == 400:
|
|
elif ret.status_code >= 400:
|
|
self.logger.info("[ERROR] (fails: %d) Error in app with status code %d for results (1). RETRYING because results can't be handled" % (i+1, ret.status_code))
|
|
|
|
rettext = ret.text
|
|
failed = True
|
|
time.sleep(8)
|
|
continue
|
|
|
|
else:
|
|
self.logger.info("[ERROR] Error in app with status code %d for results (2). Crashing because results can't be handled" % ret.status_code)
|
|
|
|
rettext = ret.text
|
|
failed = True
|
|
time.sleep(8)
|
|
break
|
|
|
|
if failed:
|
|
self.action_result["result"] = json.dumps({
|
|
"success": False,
|
|
"reason": f"Bad result from backend during startup of app: {ret.status_code}",
|
|
"extended_reason": f"{rettext}"
|
|
})
|
|
|
|
self.send_result(self.action_result, headers, stream_path)
|
|
return
|
|
|
|
except requests.exceptions.ConnectionError as e:
|
|
self.logger.info("[ERROR] FullExec Connectionerror: %s" % e)
|
|
self.action_result["result"] = json.dumps({
|
|
"success": False,
|
|
"reason": f"Connection error during startup: {e}"
|
|
})
|
|
|
|
self.send_result(self.action_result, headers, stream_path)
|
|
return
|
|
else:
|
|
self.logger.info(f"[DEBUG] Setting execution to default value with type {type(self.full_execution)}")
|
|
try:
|
|
fullexecution = json.loads(self.full_execution)
|
|
except json.decoder.JSONDecodeError as e:
|
|
self.logger.info("[ERROR] Json decode execution error: %s" % e)
|
|
self.action_result["result"] = "Json error during startup: %s" % e
|
|
self.send_result(self.action_result, headers, stream_path)
|
|
return
|
|
|
|
self.logger.info("")
|
|
|
|
|
|
self.full_execution = fullexecution
|
|
|
|
#try:
|
|
# if "backend_url" in self.full_execution:
|
|
# self.url = self.full_execution["backend_url"]
|
|
# self.base_url = self.full_execution["backend_url"]
|
|
#except KeyError:
|
|
# pass
|
|
|
|
try:
|
|
if replace_params == True:
|
|
for inner_action in self.full_execution["workflow"]["actions"]:
|
|
self.logger.info("[DEBUG] ID: %s vs %s" % (inner_action["id"], self.action["id"]))
|
|
|
|
# In case of some kind of magic, we're just doing params
|
|
if inner_action["id"] == self.action["id"]:
|
|
self.logger.info("FOUND!")
|
|
|
|
if isinstance(self.action, str):
|
|
self.logger.info("Params is in string object for self.action?")
|
|
else:
|
|
self.action["parameters"] = inner_action["parameters"]
|
|
self.action_result["action"]["parameters"] = inner_action["parameters"]
|
|
|
|
if isinstance(self.original_action, str):
|
|
self.logger.info("Params for original actions is in string object?")
|
|
else:
|
|
self.original_action["parameters"] = inner_action["parameters"]
|
|
|
|
break
|
|
|
|
except Exception as e:
|
|
self.logger.info(f"[WARNING] Failed in replace params action parsing: {e}")
|
|
|
|
self.logger.info(f"[DEBUG] AFTER FULLEXEC stream result (init): {self.current_execution_id}")
|
|
|
|
# Gets the value at the parenthesis level you want
|
|
def parse_nested_param(string, level):
|
|
"""
|
|
Generate strings contained in nested (), indexing i = level
|
|
"""
|
|
if len(re.findall("\(", string)) == len(re.findall("\)", string)):
|
|
LeftRightIndex = [x for x in zip(
|
|
[Left.start()+1 for Left in re.finditer('\(', string)],
|
|
reversed([Right.start() for Right in re.finditer('\)', string)]))]
|
|
|
|
elif len(re.findall("\(", string)) > len(re.findall("\)", string)):
|
|
return parse_nested_param(string + ')', level)
|
|
elif len(re.findall("\(", string)) < len(re.findall("\)", string)):
|
|
return parse_nested_param('(' + string, level)
|
|
else:
|
|
return 'Failed to parse params'
|
|
|
|
try:
|
|
return [string[LeftRightIndex[level][0]:LeftRightIndex[level][1]]]
|
|
except IndexError:
|
|
return [string[LeftRightIndex[level+1][0]:LeftRightIndex[level+1][1]]]
|
|
|
|
# Finds the deepest level parenthesis in a string
|
|
def maxDepth(S):
|
|
current_max = 0
|
|
max = 0
|
|
n = len(S)
|
|
|
|
# Traverse the input string
|
|
for i in range(n):
|
|
if S[i] == '(':
|
|
current_max += 1
|
|
|
|
if current_max > max:
|
|
max = current_max
|
|
elif S[i] == ')':
|
|
if current_max > 0:
|
|
current_max -= 1
|
|
else:
|
|
return -1
|
|
|
|
# finally check for unbalanced string
|
|
if current_max != 0:
|
|
return -1
|
|
|
|
return max-1
|
|
|
|
# Specific type parsing
|
|
def parse_type(data, thistype):
|
|
if data == None:
|
|
return "Empty"
|
|
|
|
if "int" in thistype or "number" in thistype:
|
|
try:
|
|
return int(data)
|
|
except ValueError:
|
|
return data
|
|
|
|
if "lower" in thistype:
|
|
return data.lower()
|
|
if "upper" in thistype:
|
|
return data.upper()
|
|
if "trim" in thistype:
|
|
return data.strip()
|
|
if "strip" in thistype:
|
|
return data.strip()
|
|
if "split" in thistype:
|
|
return data.split()
|
|
if "replace" in thistype:
|
|
splitvalues = data.split(",")
|
|
|
|
if len(splitvalues) > 2:
|
|
for i in range(len(splitvalues)):
|
|
if i != 0:
|
|
if splitvalues[i] == " ":
|
|
splitvalues[i] = " "
|
|
continue
|
|
|
|
splitvalues[i] = splitvalues[i].strip()
|
|
|
|
if splitvalues[i] == "\"\"":
|
|
splitvalues[i] = ""
|
|
if splitvalues[i] == "\" \"":
|
|
splitvalues[i] = " "
|
|
if len(splitvalues[i]) > 2:
|
|
if splitvalues[i][0] == "\"" and splitvalues[i][len(splitvalues[i])-1] == "\"":
|
|
splitvalues[i] = splitvalues[i][1:-1]
|
|
if splitvalues[i][0] == "'" and splitvalues[i][len(splitvalues[i])-1] == "'":
|
|
splitvalues[i] = splitvalues[i][1:-1]
|
|
|
|
|
|
replacementvalue = splitvalues[0]
|
|
return replacementvalue.replace(splitvalues[1], splitvalues[2], -1)
|
|
else:
|
|
return f"replace({data})"
|
|
if "join" in thistype:
|
|
try:
|
|
splitvalues = data.split(",")
|
|
if "," not in data:
|
|
return f"join({data})"
|
|
|
|
if len(splitvalues) >= 2:
|
|
|
|
# 1. Take the list and parse it from string
|
|
# 2. Take all the items and join them
|
|
# 3. Parse them back as string and return
|
|
values = ",".join(splitvalues[0:-1])
|
|
tmp = json.loads(values)
|
|
try:
|
|
newvalues = splitvalues[-1].join(str(item).strip() for item in tmp)
|
|
except TypeError:
|
|
newvalues = splitvalues[-1].join(json.dumps(item).strip() for item in tmp)
|
|
|
|
return newvalues
|
|
else:
|
|
return f"join({data})"
|
|
|
|
except (KeyError, IndexError) as e:
|
|
print(f"ERROR in join(): {e}")
|
|
except json.decoder.JSONDecodeError as e:
|
|
print(f"JSON ERROR in join(): {e}")
|
|
|
|
if "len" in thistype or "length" in thistype or "lenght" in thistype:
|
|
#self.logger.info(f"Trying to length-parse: {data}")
|
|
try:
|
|
tmp_len = json.loads(data, parse_float=str, parse_int=str, parse_constant=str)
|
|
except (NameError, KeyError, TypeError, json.decoder.JSONDecodeError) as e:
|
|
try:
|
|
#self.logger.info(f"[WARNING] INITIAL Parsing bug for length in app sdk: {e}")
|
|
# data = data.replace("\'", "\"")
|
|
data = data.replace("True", "true")
|
|
data = data.replace("False", "false")
|
|
data = data.replace("None", "null")
|
|
data = data.replace("\"", "\\\"")
|
|
data = data.replace("'", "\"")
|
|
|
|
tmp_len = json.loads(data, parse_float=str, parse_int=str, parse_constant=str)
|
|
except (NameError, KeyError, TypeError, json.decoder.JSONDecodeError) as e:
|
|
tmp_len = str(data)
|
|
|
|
return str(len(tmp_len))
|
|
|
|
if "parse" in thistype:
|
|
splitvalues = []
|
|
default_error = """Error. Expected syntax: parse(["hello","test1"],0:1)"""
|
|
if "," in data:
|
|
splitvalues = data.split(",")
|
|
|
|
for item in range(len(splitvalues)):
|
|
splitvalues[item] = splitvalues[item].strip()
|
|
else:
|
|
return default_error
|
|
|
|
lastsplit = []
|
|
if ":" in splitvalues[-1]:
|
|
lastsplit = splitvalues[-1].split(":")
|
|
else:
|
|
try:
|
|
lastsplit = [int(splitvalues[-1])]
|
|
except ValueError:
|
|
return default_error
|
|
|
|
try:
|
|
parsedlist = ",".join(splitvalues[0:-1])
|
|
if len(lastsplit) > 1:
|
|
tmp = json.loads(parsedlist)[int(lastsplit[0]):int(lastsplit[1])]
|
|
else:
|
|
tmp = json.loads(parsedlist)[lastsplit[0]]
|
|
|
|
return tmp
|
|
except IndexError as e:
|
|
return default_error
|
|
|
|
# Parses the INNER value and recurses until everything is done
|
|
# Looks for a way to use e.g. int() or number() as a value
|
|
def parse_wrapper(data):
|
|
try:
|
|
if "(" not in data or ")" not in data:
|
|
return data, False
|
|
except TypeError:
|
|
return data, False
|
|
|
|
# Because liquid can handle ALL of this now.
|
|
# Implemented for >0.9.25
|
|
#self.logger.info("[DEBUG] Skipping parser because use of its been deprecated >0.9.25 due to Liquid implementation")
|
|
return data, False
|
|
|
|
wrappers = ["int", "number", "lower", "upper", "trim", "strip", "split", "parse", "len", "length", "lenght", "join", "replace"]
|
|
|
|
if not any(wrapper in data for wrapper in wrappers):
|
|
return data, False
|
|
|
|
# Do stuff here.
|
|
inner_value = parse_nested_param(data, maxDepth(data) - 0)
|
|
outer_value = parse_nested_param(data, maxDepth(data) - 1)
|
|
|
|
wrapper_group = "|".join(wrappers)
|
|
parse_string = data
|
|
max_depth = maxDepth(parse_string)
|
|
|
|
if outer_value != inner_value:
|
|
for casting_items in reversed(range(max_depth + 1)):
|
|
c_parentheses = parse_nested_param(parse_string, casting_items)[0]
|
|
match_string = re.escape(c_parentheses)
|
|
custom_casting = re.findall(fr"({wrapper_group})\({match_string}", parse_string)
|
|
|
|
# no matching ; go next group
|
|
if len(custom_casting) == 0:
|
|
continue
|
|
|
|
inner_result = parse_type(c_parentheses, custom_casting[0])
|
|
|
|
# if result is a string then parse else return
|
|
if isinstance(inner_result, str):
|
|
parse_string = parse_string.replace(f"{custom_casting[0]}({c_parentheses})", inner_result)
|
|
elif isinstance(inner_result, list):
|
|
parse_string = parse_string.replace(f"{custom_casting[0]}({c_parentheses})",
|
|
json.dumps(inner_result))
|
|
else:
|
|
parse_string = inner_result
|
|
break
|
|
else:
|
|
c_parentheses = parse_nested_param(parse_string, 0)[0]
|
|
match_string = re.escape(c_parentheses)
|
|
custom_casting = re.findall(fr"({wrapper_group})\({match_string}", parse_string)
|
|
print("[DEBUG] In ELSE: %s" % custom_casting)
|
|
# check if a wrapper was found
|
|
if len(custom_casting) != 0:
|
|
inner_result = parse_type(c_parentheses, custom_casting[0])
|
|
if isinstance(inner_result, str):
|
|
parse_string = parse_string.replace(f"{custom_casting[0]}({c_parentheses})", inner_result)
|
|
elif isinstance(inner_result, list):
|
|
parse_string = parse_string.replace(f"{custom_casting[0]}({c_parentheses})",
|
|
json.dumps(inner_result))
|
|
else:
|
|
parse_string = inner_result
|
|
|
|
#print("PARSE STRING: %s" % parse_string)
|
|
return parse_string, True
|
|
|
|
# Looks for parantheses to grab special cases within a string, e.g:
|
|
# int(1) lower(HELLO) or length(what's the length)
|
|
# FIXME:
|
|
# There is an issue in here where it returns data wrong. Example:
|
|
# Authorization=Bearer authkey
|
|
# =
|
|
# Authorization=Bearer authkey
|
|
# ^ Double space.
|
|
def parse_wrapper_start(data, self):
|
|
try:
|
|
data = parse_liquid(data, self)
|
|
except:
|
|
pass
|
|
|
|
if "(" not in data or ")" not in data:
|
|
return data
|
|
|
|
if isinstance(data, str) and len(data) > 4:
|
|
if (data[0] == "{" or data[0] == "[") and (data[len(data)-1] == "]" or data[len(data)-1] == "}"):
|
|
self.logger.info("[DEBUG] Skipping parser because use of {[ and ]}")
|
|
return data
|
|
|
|
newdata = []
|
|
newstring = ""
|
|
record = True
|
|
paranCnt = 0
|
|
charcnt = 0
|
|
for char in data:
|
|
if char == "(":
|
|
charskip = False
|
|
if charcnt > 0:
|
|
if data[charcnt-1] == " ":
|
|
charskip = True
|
|
|
|
if not charskip:
|
|
paranCnt += 1
|
|
|
|
if not record:
|
|
record = True
|
|
|
|
if record:
|
|
newstring += char
|
|
|
|
if paranCnt == 0 and char == " ":
|
|
newdata.append(newstring)
|
|
newstring = ""
|
|
record = True
|
|
|
|
if char == ")":
|
|
paranCnt -= 1
|
|
|
|
if paranCnt == 0:
|
|
record = False
|
|
|
|
charcnt += 1
|
|
|
|
if len(newstring) > 0:
|
|
newdata.append(newstring)
|
|
|
|
parsedlist = []
|
|
non_string = False
|
|
parsed = False
|
|
for item in newdata:
|
|
ret = parse_wrapper(item)
|
|
if not isinstance(ret[0], str):
|
|
non_string = True
|
|
|
|
parsedlist.append(ret[0])
|
|
if ret[1]:
|
|
parsed = True
|
|
|
|
if not parsed:
|
|
return data
|
|
|
|
if len(parsedlist) > 0 and not non_string:
|
|
#self.logger.info("Returning parsed list: ", parsedlist)
|
|
return " ".join(parsedlist)
|
|
elif len(parsedlist) == 1 and non_string:
|
|
return parsedlist[0]
|
|
else:
|
|
#self.logger.info("Casting back to string because multi: ", parsedlist)
|
|
newlist = []
|
|
for item in parsedlist:
|
|
try:
|
|
newlist.append(str(item))
|
|
except ValueError:
|
|
newlist.append("parsing_error")
|
|
|
|
# Does this create the issue?
|
|
return " ".join(newlist)
|
|
|
|
# Parses JSON loops and such down to the item you're looking for
|
|
# $nodename.#.id
|
|
# $nodename.data.#min-max.info.id
|
|
def recurse_json(basejson, parsersplit):
|
|
match = "#([0-9a-z]+):?-?([0-9a-z]+)?#?"
|
|
try:
|
|
outercnt = 0
|
|
|
|
# Loops over split values
|
|
for value in parsersplit:
|
|
#if " " in value:
|
|
# value = value.replace(" ", "_", -1)
|
|
|
|
actualitem = re.findall(match, value, re.MULTILINE)
|
|
# Goes here if loop
|
|
if value == "#":
|
|
newvalue = []
|
|
for innervalue in basejson:
|
|
# 1. Check the next item (message)
|
|
# 2. Call this function again
|
|
|
|
try:
|
|
ret, is_loop = recurse_json(innervalue, parsersplit[outercnt+1:])
|
|
except IndexError:
|
|
# Only in here if it's the last loop without anything in it?
|
|
ret, is_loop = recurse_json(innervalue, parsersplit[outercnt:])
|
|
|
|
newvalue.append(ret)
|
|
|
|
# Magical way of returning which makes app sdk identify
|
|
# it as multi execution
|
|
return newvalue, True
|
|
|
|
# Checks specific regex like #1-2 for index 1-2 in a loop
|
|
elif len(actualitem) > 0:
|
|
|
|
is_loop = True
|
|
newvalue = []
|
|
firstitem = actualitem[0][0]
|
|
seconditem = actualitem[0][1]
|
|
if isinstance(firstitem, int):
|
|
firstitem = str(firstitem)
|
|
if isinstance(seconditem, int):
|
|
seconditem = str(seconditem)
|
|
|
|
#print("[DEBUG] ACTUAL PARSED: %s" % actualitem)
|
|
|
|
# Means it's a single item -> continue
|
|
if seconditem == "":
|
|
print("[INFO] In first - handling %s. Len: %d" % (firstitem, len(basejson)))
|
|
if firstitem.lower() == "max" or firstitem.lower() == "last" or firstitem.lower() == "end":
|
|
firstitem = len(basejson)-1
|
|
elif firstitem.lower() == "min" or firstitem.lower() == "first":
|
|
firstitem = 0
|
|
else:
|
|
firstitem = int(firstitem)
|
|
|
|
print(f"[DEBUG] Post lower checks with item {firstitem}")
|
|
tmpitem = basejson[int(firstitem)]
|
|
try:
|
|
newvalue, is_loop = recurse_json(tmpitem, parsersplit[outercnt+1:])
|
|
except IndexError:
|
|
newvalue, is_loop = (tmpitem, parsersplit[outercnt+1:])
|
|
else:
|
|
print("[INFO] In ELSE - handling %s and %s" % (firstitem, seconditem))
|
|
if isinstance(firstitem, str):
|
|
if firstitem.lower() == "max" or firstitem.lower() == "last" or firstitem.lower() == "end":
|
|
firstitem = len(basejson)-1
|
|
elif firstitem.lower() == "min" or firstitem.lower() == "first":
|
|
firstitem = 0
|
|
else:
|
|
firstitem = int(firstitem)
|
|
else:
|
|
firstitem = int(firstitem)
|
|
|
|
if isinstance(seconditem, str):
|
|
if seconditem.lower() == "max" or seconditem.lower() == "last" or firstitem.lower() == "end":
|
|
seconditem = len(basejson)-1
|
|
elif seconditem.lower() == "min" or seconditem.lower() == "first":
|
|
seconditem = 0
|
|
else:
|
|
seconditem = int(seconditem)
|
|
else:
|
|
seconditem = int(seconditem)
|
|
|
|
print(f"[DEBUG] Post lower checks 2: {firstitem} AND {seconditem}")
|
|
newvalue = []
|
|
if int(seconditem) > len(basejson):
|
|
seconditem = len(basejson)
|
|
|
|
for i in range(int(firstitem), int(seconditem)+1):
|
|
# 1. Check the next item (message)
|
|
# 2. Call this function again
|
|
#self.logger.info("Base: %s" % basejson[i])
|
|
|
|
try:
|
|
ret, tmp_loop = recurse_json(basejson[i], parsersplit[outercnt+1:])
|
|
except IndexError:
|
|
print("[DEBUG] INDEXERROR: ", parsersplit[outercnt])
|
|
#ret = innervalue
|
|
ret, tmp_loop = recurse_json(basejson[i], parsersplit[outercnt:])
|
|
|
|
newvalue.append(ret)
|
|
|
|
return newvalue, is_loop
|
|
|
|
else:
|
|
if len(value) == 0:
|
|
return basejson, False
|
|
|
|
try:
|
|
if isinstance(basejson, list):
|
|
print("[WARNING] VALUE IN ISINSTANCE IS NOT TO BE USED (list): %s" % value)
|
|
return basejson, False
|
|
elif isinstance(basejson[value], str):
|
|
try:
|
|
if (basejson[value].endswith("}") and basejson[value].endswith("}")) or (basejson[value].startswith("[") and basejson[value].endswith("]")):
|
|
basejson = json.loads(basejson[value])
|
|
else:
|
|
return str(basejson[value]), False
|
|
except json.decoder.JSONDecodeError as e:
|
|
return str(basejson[value]), False
|
|
else:
|
|
basejson = basejson[value]
|
|
except KeyError as e:
|
|
print("[WARNING] Running secondary value check with replacement of underscore in %s: %s" % (value, e))
|
|
if "_" in value:
|
|
value = value.replace("_", " ", -1)
|
|
elif " " in value:
|
|
value = value.replace(" ", "_", -1)
|
|
|
|
if isinstance(basejson, list):
|
|
print("[WARNING] VALUE IN ISINSTANCE IS NOT TO BE USED (list): %s" % value)
|
|
return basejson, False
|
|
elif isinstance(basejson[value], str):
|
|
print(f"[INFO] LOADING STRING '%s' AS JSON" % basejson[value])
|
|
try:
|
|
print("[DEBUG] BASEJSON: %s" % basejson)
|
|
if (basejson[value].endswith("}") and basejson[value].endswith("}")) or (basejson[value].startswith("[") and basejson[value].endswith("]")):
|
|
basejson = json.loads(basejson[value])
|
|
else:
|
|
return str(basejson[value]), False
|
|
except json.decoder.JSONDecodeError as e:
|
|
print("[DEBUG] RETURNING BECAUSE '%s' IS A NORMAL STRING (1)" % basejson[value])
|
|
return str(basejson[value]), False
|
|
else:
|
|
basejson = basejson[value]
|
|
|
|
|
|
outercnt += 1
|
|
|
|
except KeyError as e:
|
|
print("[INFO] Lower keyerror: %s" % e)
|
|
return "", False
|
|
|
|
#return basejson
|
|
#return "KeyError: Couldn't find key: %s" % e
|
|
|
|
return basejson, False
|
|
|
|
# Takes a workflow execution as argument
|
|
# Returns a string if the result is single, or a list if it's a list
|
|
def get_json_value(execution_data, input_data):
|
|
parsersplit = input_data.split(".")
|
|
actionname_lower = parsersplit[0][1:].lower()
|
|
|
|
#Actionname: Start_node
|
|
#print(f"\n[INFO] Actionname: {actionname_lower}")
|
|
|
|
# 1. Find the action
|
|
baseresult = ""
|
|
|
|
appendresult = ""
|
|
#print("[INFO] Parsersplit length: %d" % len(parsersplit))
|
|
if (actionname_lower.startswith("exec ") or actionname_lower.startswith("webhook ") or actionname_lower.startswith("schedule ") or actionname_lower.startswith("userinput ") or actionname_lower.startswith("email_trigger ") or actionname_lower.startswith("trigger ")) and len(parsersplit) == 1:
|
|
record = False
|
|
for char in actionname_lower:
|
|
if char == " ":
|
|
record = True
|
|
|
|
if record:
|
|
appendresult += char
|
|
|
|
actionname_lower = "exec"
|
|
elif actionname_lower.startswith("shuffle_cache ") or actionname_lower.startswith("shuffle_db "):
|
|
actionname_lower = "shuffle_cache"
|
|
|
|
actionname_lower = actionname_lower.replace(" ", "_", -1)
|
|
|
|
try:
|
|
if actionname_lower == "exec" or actionname_lower == "webhook" or actionname_lower == "schedule" or actionname_lower == "userinput" or actionname_lower == "email_trigger" or actionname_lower == "trigger":
|
|
baseresult = execution_data["execution_argument"]
|
|
elif actionname_lower == "shuffle_cache":
|
|
print("[DEBUG] SHOULD GET CACHE KEY: %s" % parsersplit)
|
|
if len(parsersplit) > 1:
|
|
actual_key = parsersplit[1]
|
|
print("[DEBUG] KEY: %s" % actual_key)
|
|
cachedata = self.get_cache(actual_key)
|
|
print("CACHE: %s" % cachedata)
|
|
parsersplit.pop(1)
|
|
try:
|
|
baseresult = json.dumps(cachedata)
|
|
except json.decoder.JSONDecodeError as e:
|
|
print("[WARNING] Failed json dumping: %s" % e)
|
|
|
|
else:
|
|
if execution_data["results"] != None:
|
|
for result in execution_data["results"]:
|
|
resultlabel = result["action"]["label"].replace(" ", "_", -1).lower()
|
|
if resultlabel.lower() == actionname_lower:
|
|
baseresult = result["result"]
|
|
break
|
|
else:
|
|
print("[DEBUG] No results to get values from.")
|
|
baseresult = "$" + parsersplit[0][1:]
|
|
|
|
if len(baseresult) == 0:
|
|
try:
|
|
for variable in execution_data["workflow"]["workflow_variables"]:
|
|
variablename = variable["name"].replace(" ", "_", -1).lower()
|
|
|
|
if variablename.lower() == actionname_lower:
|
|
baseresult = variable["value"]
|
|
break
|
|
|
|
except KeyError as e:
|
|
#print("[INFO] KeyError wf variables: %s" % e)
|
|
pass
|
|
except TypeError as e:
|
|
#print("[INFO] TypeError wf variables: %s" % e)
|
|
pass
|
|
|
|
if len(baseresult) == 0:
|
|
try:
|
|
for variable in execution_data["execution_variables"]:
|
|
variablename = variable["name"].replace(" ", "_", -1).lower()
|
|
if variablename.lower() == actionname_lower:
|
|
baseresult = variable["value"]
|
|
break
|
|
except KeyError as e:
|
|
#print("[INFO] KeyError exec variables: %s" % e)
|
|
pass
|
|
except TypeError as e:
|
|
#print("[INFO] TypeError exec variables: %s" % e)
|
|
pass
|
|
|
|
except KeyError as error:
|
|
print(f"[DEBUG] KeyError in JSON: {error}")
|
|
|
|
#print(f"[INFO] After first trycatch. Baseresult")#, baseresult)
|
|
|
|
# 2. Find the JSON data
|
|
# Returns if there isn't any JSON in the base ($nodename)
|
|
if len(baseresult) == 0:
|
|
return ""+appendresult, False
|
|
|
|
#print("[INFO] After second return")
|
|
# Returns if the result is JUST something like $nodename, not $nodename.value
|
|
if len(parsersplit) == 1:
|
|
returndata = str(baseresult)+str(appendresult)
|
|
print("[DEBUG] RETURNING!")#: %s" % returndata)
|
|
return returndata, False
|
|
|
|
baseresult = baseresult.replace(" True,", " true,")
|
|
baseresult = baseresult.replace(" False", " false,")
|
|
|
|
# Tries to actually read it as JSON with some stupid formatting
|
|
#print("[INFO] After third parser return - Formatted")#, baseresult)
|
|
basejson = {}
|
|
try:
|
|
basejson = json.loads(baseresult)
|
|
except json.decoder.JSONDecodeError as e:
|
|
try:
|
|
baseresult = baseresult.replace("\'", "\"")
|
|
basejson = json.loads(baseresult)
|
|
except json.decoder.JSONDecodeError as e:
|
|
print(f"[ERROR] Parser issue with JSON for {baseresult}: {e}")
|
|
return str(baseresult)+str(appendresult), False
|
|
|
|
print("[INFO] After fourth parser return as JSON")
|
|
# Finds the ACTUAL value which is in the $nodename.value.test - focusing on value.test
|
|
data, is_loop = recurse_json(basejson, parsersplit[1:])
|
|
parseditem = data
|
|
|
|
if isinstance(parseditem, dict) or isinstance(parseditem, list):
|
|
try:
|
|
parseditem = json.dumps(parseditem)
|
|
except json.decoder.JSONDecodeError as e:
|
|
print("[WARNING] Parseditem issue: %s" % e)
|
|
pass
|
|
|
|
if is_loop:
|
|
print("[DEBUG] DATA IS A LOOP - SHOULD WRAP")
|
|
if parsersplit[-1] == "#":
|
|
print("[WARNING] SET DATA WRAPPER TO NORMAL!")
|
|
parseditem = "${SHUFFLE_NO_SPLITTER%s}$" % json.dumps(data)
|
|
else:
|
|
# Return value: ${id[12345, 45678]}$
|
|
print("[WARNING] SET DATA WRAPPER TO %s!" % parsersplit[-1])
|
|
parseditem = "${%s%s}$" % (parsersplit[-1], json.dumps(data))
|
|
|
|
|
|
returndata = str(parseditem)+str(appendresult)
|
|
|
|
# New in 0.8.97: Don't return items without lists
|
|
#self.logger.info("RETURNDATA: %s" % returndata)
|
|
#return returndata, is_loop
|
|
|
|
# 0.9.70:
|
|
# The {} and [] checks are required because e.g. 7e7 is valid JSON for some reason...
|
|
# This breaks EVERYTHING
|
|
try:
|
|
if (returndata.endswith("}") and returndata.endswith("}")) or (returndata.startswith("[") and returndata.endswith("]")):
|
|
return json.dumps(json.loads(returndata)), is_loop
|
|
else:
|
|
return returndata, is_loop
|
|
except json.decoder.JSONDecodeError as e:
|
|
return returndata, is_loop
|
|
|
|
# Sending self as it's not a normal function
|
|
def parse_liquid(template, self):
|
|
|
|
errors = False
|
|
error_msg = ""
|
|
try:
|
|
if len(template) > 10000000:
|
|
self.logger.info("[DEBUG] Skipping liquid - size too big (%d)" % len(template))
|
|
return template
|
|
|
|
if "${" in template and "}$" in template:
|
|
self.logger.info("[DEBUG] Shuffle loop shouldn't run in liquid. Data length: %d" % len(template))
|
|
return template
|
|
|
|
#if not "{{" in template or not "}}" in template:
|
|
# if not "{%" in template or not "%}" in template:
|
|
# self.logger.info("Skipping liquid - missing {{ }} and {% %}")
|
|
# return template
|
|
|
|
#if not "{{" in template or not "}}" in template:
|
|
# return template
|
|
|
|
#self.logger.info(globals())
|
|
#if len(template) > 100:
|
|
# self.logger.info("[DEBUG] Running liquid with data of length %d" % len(template))
|
|
#self.logger.info(f"[DEBUG] Data: {template}")
|
|
|
|
all_globals = globals()
|
|
all_globals["self"] = self
|
|
run = Liquid(template, mode="wild", from_file=False, filters=shuffle_filters.filters, globals=all_globals)
|
|
|
|
# Add locals that are missing to globals
|
|
ret = run.render()
|
|
return ret
|
|
except jinja2.exceptions.TemplateNotFound as e:
|
|
self.logger.info(f"[ERROR] Liquid Template error: {e}")
|
|
error = True
|
|
error_msg = e
|
|
|
|
self.action["parameters"].append({
|
|
"name": "liquid_template_error",
|
|
"value": f"There was a Liquid input error (1). Details: {e}",
|
|
})
|
|
|
|
self.action_result["action"] = self.action
|
|
except SyntaxError as e:
|
|
self.logger.info(f"[ERROR] Liquid Syntax error: {e}")
|
|
error = True
|
|
error_msg = e
|
|
|
|
self.action["parameters"].append({
|
|
"name": "liquid_python_syntax_error",
|
|
"value": f"There was a syntax error in your Liquid input (2). Details: {e}",
|
|
})
|
|
|
|
self.action_result["action"] = self.action
|
|
except IndentationError as e:
|
|
self.logger.info(f"[ERROR] Liquid IndentationError: {e}")
|
|
error = True
|
|
error_msg = e
|
|
|
|
self.action["parameters"].append({
|
|
"name": "liquid_indentiation_error",
|
|
"value": f"There was an indentation error in your Liquid input (2). Details: {e}",
|
|
})
|
|
|
|
self.action_result["action"] = self.action
|
|
except jinja2.exceptions.TemplateSyntaxError as e:
|
|
self.logger.info(f"[ERROR] Liquid Syntax error: {e}")
|
|
error = True
|
|
error_msg = e
|
|
|
|
self.action["parameters"].append({
|
|
"name": "liquid_syntax_error",
|
|
"value": f"There was a syntax error in your Liquid input (2). Details: {e}",
|
|
})
|
|
|
|
self.action_result["action"] = self.action
|
|
except json.decoder.JSONDecodeError as e:
|
|
self.logger.info(f"[ERROR] Liquid JSON Syntax error: {e}")
|
|
|
|
replace = False
|
|
skip_next = False
|
|
newlines = []
|
|
thisline = []
|
|
for line in template.split("\n"):
|
|
#print("LINE: %s" % repr(line))
|
|
if "\"\"\"" in line or "\'\'\'" in line:
|
|
if replace:
|
|
skip_next = True
|
|
else:
|
|
replace = not replace
|
|
|
|
if replace == True:
|
|
thisline.append(line)
|
|
if skip_next == True:
|
|
if len(thisline) > 0:
|
|
#print(thisline)
|
|
newlines.append(" ".join(thisline))
|
|
thisline = []
|
|
|
|
replace = False
|
|
else:
|
|
newlines.append(line)
|
|
|
|
new_template = "\n".join(newlines)
|
|
if new_template != template:
|
|
#check_template(new_template)
|
|
return parse_liquid(new_template, self)
|
|
else:
|
|
error = True
|
|
error_msg = e
|
|
|
|
self.action["parameters"].append({
|
|
"name": "liquid_json_error",
|
|
"value": f"There was a syntax error in your input JSON(2). This is typically an issue with escaping newlines. Details: {e}",
|
|
})
|
|
|
|
self.action_result["action"] = self.action
|
|
except TypeError as e:
|
|
try:
|
|
if "string as left operand" in f"{e}":
|
|
#print(f"HANDLE REPLACE: {template}")
|
|
split_left = template.split("|")
|
|
if len(split_left) < 2:
|
|
return template
|
|
|
|
splititem = split_left[0]
|
|
additem = "{{"
|
|
if "{{" in splititem:
|
|
splititem = splititem.replace("{{", "", -1)
|
|
|
|
if "{%" in splititem:
|
|
splititem = splititem.replace("{%", "", -1)
|
|
additem = "{%"
|
|
|
|
splititem = "%s \"%s\"" % (additem, splititem.strip())
|
|
parsed_template = template.replace(split_left[0], splititem)
|
|
run = Liquid(parsed_template, mode="wild", from_file=False)
|
|
return run.render(**globals())
|
|
|
|
except Exception as e:
|
|
print(f"SubError in Liquid: {e}")
|
|
|
|
self.action["parameters"].append({
|
|
"name": "liquid_general_error",
|
|
"value": f"There was general error Liquid input (2). Details: {e}",
|
|
})
|
|
|
|
self.action_result["action"] = self.action
|
|
#return template
|
|
|
|
self.logger.info(f"[ERROR] Liquid TypeError error: {e}")
|
|
error = True
|
|
error_msg = e
|
|
|
|
except Exception as e:
|
|
self.logger.info(f"[ERROR] General exception for liquid: {e}")
|
|
error = True
|
|
error_msg = e
|
|
|
|
self.action["parameters"].append({
|
|
"name": "liquid_general_exception",
|
|
"value": f"There was general exception Liquid input (2). Details: {e}",
|
|
})
|
|
|
|
self.action_result["action"] = self.action
|
|
|
|
if "fmt" in error_msg and "liquid_date" in error_msg:
|
|
return template
|
|
|
|
self.logger.info("Done in liquid")
|
|
if error == True:
|
|
self.action_result["status"] = "FAILURE"
|
|
data = {
|
|
"success": False,
|
|
"reason": f"Failed to parse LiquidPy: {error_msg}",
|
|
"input": template,
|
|
}
|
|
|
|
try:
|
|
self.action_result["result"] = json.dumps(data)
|
|
except Exception as e:
|
|
self.action_result["result"] = f"Failed to parse LiquidPy: {error_msg}"
|
|
print("[WARNING] Failed to set LiquidPy result")
|
|
|
|
self.action_result["completed_at"] = int(time.time())
|
|
self.send_result(self.action_result, headers, stream_path)
|
|
|
|
self.logger.info(f"[ERROR] Sent FAILURE response to backend due to : {e}")
|
|
|
|
if runtime == "run":
|
|
return template
|
|
else:
|
|
os.exit()
|
|
|
|
return template
|
|
|
|
# Suboptimal cleanup script for BOdy parsing of OpenAPI
|
|
# Should have a regex which looks for the value, then goes out and cleans up the key
|
|
def recurse_cleanup_script(data):
|
|
deletekeys = []
|
|
newvalue = data
|
|
try:
|
|
if not isinstance(data, dict):
|
|
newvalue = json.loads(data)
|
|
else:
|
|
newvalue = data
|
|
|
|
for key, value in newvalue.items():
|
|
if isinstance(value, str) and len(value) == 0:
|
|
deletekeys.append(key)
|
|
continue
|
|
|
|
if isinstance(value, list):
|
|
try:
|
|
value = json.dumps(value)
|
|
except:
|
|
print("[WARNING] Json parsing issue in recursed value")
|
|
pass
|
|
|
|
if value == "${%s}" % key:
|
|
print("[WARNING] Deleting %s because key = value" % key)
|
|
deletekeys.append(key)
|
|
continue
|
|
elif "${" in value and "}" in value:
|
|
print("[WARNING] Deleting %s because it contains ${ and }" % key)
|
|
deletekeys.append(key)
|
|
continue
|
|
|
|
if isinstance(value, dict):
|
|
newvalue[key] = recurse_cleanup_script(value)
|
|
|
|
except json.decoder.JSONDecodeError as e:
|
|
# Since here the data isn't at all JSON compatible..?
|
|
# Seems to happen with newlines in variables being parsed in as strings?
|
|
print(f"[ERROR] Failed JSON replacement for OpenAPI keys (3) {e}. Value: {data}")
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed as an exception (1): {e}")
|
|
|
|
try:
|
|
for deletekey in deletekeys:
|
|
try:
|
|
del newvalue[deletekey]
|
|
except:
|
|
pass
|
|
except Exception as e:
|
|
print(f"[WARNING] Failed in deletekeys: {e}")
|
|
return data
|
|
|
|
try:
|
|
for key, value in newvalue.items():
|
|
if isinstance(value, bool):
|
|
continue
|
|
elif isinstance(value, dict) and not bool(value):
|
|
continue
|
|
|
|
try:
|
|
value = json.loads(value)
|
|
newvalue[key] = value
|
|
except json.decoder.JSONDecodeError as e:
|
|
continue
|
|
except Exception as e:
|
|
continue
|
|
|
|
try:
|
|
data = json.dumps(newvalue)
|
|
except json.decoder.JSONDecodeError as e:
|
|
print("[WARNING] JsonDecodeError: %s" % e)
|
|
data = newvalue
|
|
|
|
except json.decoder.JSONDecodeError as e:
|
|
print("[WARNING] Failed JSON replacement for OpenAPI keys (2) {e}")
|
|
except Exception as e:
|
|
print(f"[WARNING] Failed as an exception (2): {e}")
|
|
|
|
return data
|
|
|
|
# Parses parameters sent to it and returns whether it did it successfully with the values found
|
|
def parse_params(action, fullexecution, parameter, self):
|
|
# Skip if it starts with $?
|
|
jsonparsevalue = "$."
|
|
is_loop = False
|
|
|
|
# Matches with space in the first part, but not in subsequent parts.
|
|
# JSON / yaml etc shouldn't have spaces in their fields anyway.
|
|
#match = ".*?([$]{1}([a-zA-Z0-9 _-]+\.?){1}([a-zA-Z0-9#_-]+\.?){0,})[$/, ]?"
|
|
#match = ".*?([$]{1}([a-zA-Z0-9 _-]+\.?){1}([a-zA-Z0-9#_-]+\.?){0,})"
|
|
|
|
#match = ".*?([$]{1}([a-zA-Z0-9_-]+\.?){1}([a-zA-Z0-9#_-]+\.?){0,})" # Removed space - no longer ok. Force underscore.
|
|
match = "([$]{1}([a-zA-Z0-9_-]+\.?){1}([a-zA-Z0-9#_-]+\.?){0,})" # Removed .*? to make it work with large amounts of data
|
|
|
|
# Extra replacements for certain scenarios
|
|
escaped_dollar = "\\$"
|
|
escape_replacement = "\\%\\%\\%\\%\\%"
|
|
end_variable = "^_^"
|
|
|
|
#self.logger.info("Input value: %s" % parameter["value"])
|
|
try:
|
|
parameter["value"] = parameter["value"].replace(escaped_dollar, escape_replacement, -1)
|
|
except:
|
|
self.logger.info("Error in initial replacement of escaped dollar!")
|
|
|
|
paramname = ""
|
|
try:
|
|
paramname = parameter["name"]
|
|
except:
|
|
pass
|
|
|
|
# Basic fix in case variant isn't set
|
|
# Variant is ALWAYS STATIC_VALUE from mid 2021~
|
|
try:
|
|
#self.logger.info(f"[DEBUG] Parameter '{paramname}' of length {len(parameter['value'])}")
|
|
parameter["variant"] = parameter["variant"]
|
|
except:
|
|
parameter["variant"] = "STATIC_VALUE"
|
|
|
|
# Regex to find all the things
|
|
# Should just go in here if data is ... not so big
|
|
#if parameter["variant"] == "STATIC_VALUE" and len(parameter["value"]) < 1000000:
|
|
#if parameter["variant"] == "STATIC_VALUE" and len(parameter["value"]) < 5000000:
|
|
if parameter["variant"] == "STATIC_VALUE":
|
|
data = parameter["value"]
|
|
actualitem = re.findall(match, data, re.MULTILINE)
|
|
#self.logger.debug(f"\n\nHandle static data with JSON: {data}\n\n")
|
|
#self.logger.info("STATIC PARSED: %s" % actualitem)
|
|
#self.logger.info("[INFO] Done with regex matching")
|
|
if len(actualitem) > 0:
|
|
for replace in actualitem:
|
|
try:
|
|
to_be_replaced = replace[0]
|
|
except IndexError:
|
|
continue
|
|
|
|
# Handles for loops etc.
|
|
# FIXME: Should it dump to string here? Doesn't that defeat the purpose?
|
|
# Trying without string dumping.
|
|
|
|
value, is_loop = get_json_value(fullexecution, to_be_replaced)
|
|
#self.logger.info(f"\n\nType of value: {type(value)}")
|
|
if isinstance(value, str):
|
|
parameter["value"] = parameter["value"].replace(to_be_replaced, value)
|
|
elif isinstance(value, dict) or isinstance(value, list):
|
|
# Changed from JSON dump to str() 28.05.2021
|
|
# This makes it so the parameters gets lists and dicts straight up
|
|
parameter["value"] = parameter["value"].replace(to_be_replaced, json.dumps(value))
|
|
|
|
#try:
|
|
# parameter["value"] = parameter["value"].replace(to_be_replaced, str(value))
|
|
#except:
|
|
# parameter["value"] = parameter["value"].replace(to_be_replaced, json.dumps(value))
|
|
# self.logger.info("Failed parsing value as string?")
|
|
else:
|
|
self.logger.info("[WARNING] Unknown type %s" % type(value))
|
|
try:
|
|
parameter["value"] = parameter["value"].replace(to_be_replaced, json.dumps(value))
|
|
except json.decoder.JSONDecodeError as e:
|
|
parameter["value"] = parameter["value"].replace(to_be_replaced, value)
|
|
|
|
#self.logger.info("VALUE: %s" % parameter["value"])
|
|
else:
|
|
self.logger.info(f"[ERROR] Not running static variant regex parsing (slow) on value with length {len(parameter['value'])}. Max is 5Mb~.")
|
|
|
|
if parameter["variant"] == "WORKFLOW_VARIABLE":
|
|
self.logger.info("[DEBUG] Handling workflow variable")
|
|
found = False
|
|
try:
|
|
for item in fullexecution["workflow"]["workflow_variables"]:
|
|
if parameter["action_field"] == item["name"]:
|
|
found = True
|
|
parameter["value"] = item["value"]
|
|
break
|
|
except KeyError as e:
|
|
self.logger.info("KeyError WF variable 1: %s" % e)
|
|
pass
|
|
except TypeError as e:
|
|
self.logger.info("TypeError WF variables 1: %s" % e)
|
|
pass
|
|
|
|
if not found:
|
|
try:
|
|
for item in fullexecution["execution_variables"]:
|
|
if parameter["action_field"] == item["name"]:
|
|
parameter["value"] = item["value"]
|
|
break
|
|
except KeyError as e:
|
|
self.logger.info("KeyError WF variable 2: %s" % e)
|
|
pass
|
|
except TypeError as e:
|
|
self.logger.info("TypeError WF variables 2: %s" % e)
|
|
pass
|
|
|
|
elif parameter["variant"] == "ACTION_RESULT":
|
|
# FIXME - calculate value based on action_field and $if prominent
|
|
# FIND THE RIGHT LABEL
|
|
# GET THE LABEL'S RESULT
|
|
|
|
tmpvalue = ""
|
|
self.logger.info("ACTION FIELD: %s" % parameter["action_field"])
|
|
|
|
fullname = "$"
|
|
if parameter["action_field"] == "Execution Argument":
|
|
tmpvalue = fullexecution["execution_argument"]
|
|
fullname += "exec"
|
|
else:
|
|
fullname += parameter["action_field"]
|
|
|
|
self.logger.info("PRE Fullname: %s" % fullname)
|
|
|
|
if parameter["value"].startswith(jsonparsevalue):
|
|
fullname += parameter["value"][1:]
|
|
#else:
|
|
# fullname = "$%s" % parameter["action_field"]
|
|
|
|
self.logger.info("Fullname: %s" % fullname)
|
|
actualitem = re.findall(match, fullname, re.MULTILINE)
|
|
self.logger.info("ACTION PARSED: %s" % actualitem)
|
|
if len(actualitem) > 0:
|
|
for replace in actualitem:
|
|
try:
|
|
to_be_replaced = replace[0]
|
|
except IndexError:
|
|
self.logger.info("Nothing to replace?: " % e)
|
|
continue
|
|
|
|
# This will never be a loop aka multi argument
|
|
parameter["value"] = to_be_replaced
|
|
|
|
value, is_loop = get_json_value(fullexecution, to_be_replaced)
|
|
self.logger.info("Loop: %s" % is_loop)
|
|
if isinstance(value, str):
|
|
parameter["value"] = parameter["value"].replace(to_be_replaced, value)
|
|
elif isinstance(value, dict):
|
|
parameter["value"] = parameter["value"].replace(to_be_replaced, json.dumps(value))
|
|
else:
|
|
self.logger.info("Unknown type %s" % type(value))
|
|
try:
|
|
parameter["value"] = parameter["value"].replace(to_be_replaced, json.dumps(value))
|
|
except json.decoder.JSONDecodeError as e:
|
|
parameter["value"] = parameter["value"].replace(to_be_replaced, value)
|
|
|
|
#self.logger.info("PRE Replaced data: %s" % parameter["value"])
|
|
|
|
try:
|
|
parameter["value"] = parameter["value"].replace(end_variable, "", -1)
|
|
parameter["value"] = parameter["value"].replace(escape_replacement, "$", -1)
|
|
except:
|
|
self.logger.info(f"[ERROR] Problem in datareplacement: {e}")
|
|
|
|
# Just here in case it breaks
|
|
# Implemented 02.08.2021
|
|
#self.logger.info("Pre liquid: %s" % parameter["value"])
|
|
try:
|
|
parameter["value"] = parse_liquid(parameter["value"], self)
|
|
except:
|
|
pass
|
|
|
|
return "", parameter["value"], is_loop
|
|
|
|
def run_validation(sourcevalue, check, destinationvalue):
|
|
print("[DEBUG] Checking %s %s %s" % (sourcevalue, check, destinationvalue))
|
|
|
|
if check == "=" or check.lower() == "equals":
|
|
if str(sourcevalue).lower() == str(destinationvalue).lower():
|
|
return True
|
|
elif check == "!=" or check.lower() == "does not equal":
|
|
if str(sourcevalue).lower() != str(destinationvalue).lower():
|
|
return True
|
|
elif check.lower() == "startswith":
|
|
if str(sourcevalue).lower().startswith(str(destinationvalue).lower()):
|
|
return True
|
|
elif check.lower() == "endswith":
|
|
if str(sourcevalue).lower().endswith(str(destinationvalue).lower()):
|
|
return True
|
|
elif check.lower() == "contains":
|
|
if destinationvalue.lower() in sourcevalue.lower():
|
|
return True
|
|
|
|
elif check.lower() == "is empty":
|
|
if len(sourcevalue) == 0:
|
|
return True
|
|
|
|
if str(sourcevalue) == 0:
|
|
return True
|
|
|
|
return False
|
|
|
|
elif check.lower() == "contains_any_of":
|
|
newvalue = [destinationvalue.lower()]
|
|
if "," in destinationvalue:
|
|
newvalue = destinationvalue.split(",")
|
|
elif ", " in destinationvalue:
|
|
newvalue = destinationvalue.split(", ")
|
|
|
|
for item in newvalue:
|
|
if not item:
|
|
continue
|
|
|
|
if item.strip() in sourcevalue:
|
|
print("[INFO] Found %s in %s" % (item, sourcevalue))
|
|
return True
|
|
|
|
return False
|
|
elif check.lower() == "larger than" or check.lower() == "bigger than":
|
|
try:
|
|
if str(sourcevalue).isdigit() and str(destinationvalue).isdigit():
|
|
if int(sourcevalue) > int(destinationvalue):
|
|
return True
|
|
|
|
except AttributeError as e:
|
|
print("[WARNING] Condition larger than failed with values %s and %s: %s" % (sourcevalue, destinationvalue, e))
|
|
return False
|
|
elif check.lower() == "smaller than" or check.lower() == "less than":
|
|
try:
|
|
if str(sourcevalue).isdigit() and str(destinationvalue).isdigit():
|
|
if int(sourcevalue) < int(destinationvalue):
|
|
return True
|
|
|
|
except AttributeError as e:
|
|
print("[WARNING] Condition smaller than failed with values %s and %s: %s" % (sourcevalue, destinationvalue, e))
|
|
return False
|
|
elif check.lower() == "re" or check.lower() == "matches regex":
|
|
try:
|
|
found = re.search(destinationvalue, sourcevalue)
|
|
except re.error as e:
|
|
print("[WARNING] Regex error in condition: %s" % e)
|
|
return False
|
|
|
|
if found == None:
|
|
return False
|
|
|
|
return True
|
|
else:
|
|
print("[DEBUG] Condition: can't handle %s yet. Setting to true" % check)
|
|
|
|
return False
|
|
|
|
def check_branch_conditions(action, fullexecution, self):
|
|
# relevantbranches = workflow.branches where destination = action
|
|
try:
|
|
if fullexecution["workflow"]["branches"] == None or len(fullexecution["workflow"]["branches"]) == 0:
|
|
return True, ""
|
|
except KeyError:
|
|
return True, ""
|
|
|
|
# Startnode should always run - no need to check incoming
|
|
try:
|
|
if action["id"] == fullexecution["start"]:
|
|
return True, ""
|
|
except Exception as error:
|
|
self.logger.info(f"[WARNING] Failed checking startnode: {error}")
|
|
return True, ""
|
|
|
|
available_checks = [
|
|
"=",
|
|
"equals",
|
|
"!=",
|
|
"does not equal",
|
|
">",
|
|
"larger than",
|
|
"<",
|
|
"less than",
|
|
">=",
|
|
"<=",
|
|
"startswith",
|
|
"endswith",
|
|
"contains",
|
|
"contains_any_of",
|
|
"re",
|
|
"matches regex",
|
|
]
|
|
|
|
relevantbranches = []
|
|
correct_branches = 0
|
|
matching_branches = 0
|
|
for branch in fullexecution["workflow"]["branches"]:
|
|
if branch["destination_id"] != action["id"]:
|
|
continue
|
|
|
|
matching_branches += 1
|
|
|
|
# Find if previous is skipped or failed. Skipped != correct branch
|
|
try:
|
|
should_skip = False
|
|
for res in fullexecution["results"]:
|
|
if res["action"]["id"] == branch["source_id"]:
|
|
if res["status"] == "FAILURE" or res["status"] == "SKIPPED":
|
|
should_skip = True
|
|
|
|
break
|
|
|
|
if should_skip:
|
|
continue
|
|
except Exception as e:
|
|
self.logger.info("[WARNING] Failed handling check of if parent is skipped")
|
|
|
|
|
|
# Remove anything without a condition
|
|
try:
|
|
if (branch["conditions"]) == 0 or branch["conditions"] == None:
|
|
correct_branches += 1
|
|
continue
|
|
except KeyError:
|
|
correct_branches += 1
|
|
continue
|
|
|
|
successful_conditions = []
|
|
failed_conditions = []
|
|
successful_conditions = 0
|
|
total_conditions = len(branch["conditions"])
|
|
for condition in branch["conditions"]:
|
|
self.logger.info("[DEBUG] Getting condition value of %s" % condition)
|
|
|
|
# Parse all values first here
|
|
sourcevalue = condition["source"]["value"]
|
|
check, sourcevalue, is_loop = parse_params(action, fullexecution, condition["source"], self)
|
|
if check:
|
|
continue
|
|
|
|
sourcevalue = parse_wrapper_start(sourcevalue, self)
|
|
destinationvalue = condition["destination"]["value"]
|
|
|
|
check, destinationvalue, is_loop = parse_params(action, fullexecution, condition["destination"], self)
|
|
if check:
|
|
continue
|
|
|
|
destinationvalue = parse_wrapper_start(destinationvalue, self)
|
|
|
|
if not condition["condition"]["value"] in available_checks:
|
|
self.logger.warning("Skipping %s %s %s because %s is invalid." % (sourcevalue, condition["condition"]["value"], destinationvalue, condition["condition"]["value"]))
|
|
continue
|
|
|
|
# Configuration = negated because of WorkflowAppActionParam..
|
|
validation = run_validation(sourcevalue, condition["condition"]["value"], destinationvalue)
|
|
try:
|
|
if condition["condition"]["configuration"]:
|
|
validation = not validation
|
|
except KeyError:
|
|
pass
|
|
|
|
if validation == True:
|
|
successful_conditions += 1
|
|
|
|
if total_conditions == successful_conditions:
|
|
correct_branches += 1
|
|
|
|
if matching_branches == 0:
|
|
return True, ""
|
|
|
|
if matching_branches > 0 and correct_branches > 0:
|
|
return True, ""
|
|
|
|
self.logger.info("[DEBUG] Correct branches vs matching branches: %d vs %d" % (correct_branches, matching_branches))
|
|
return False, {"success": False, "reason": "Minimum of one branch's conditions must be correct to continue. Total: %d of %d" % (correct_branches, matching_branches)}
|
|
|
|
|
|
#
|
|
#
|
|
#
|
|
#
|
|
# CONT
|
|
# CONT
|
|
# CONT
|
|
# CONT
|
|
# CONT
|
|
# CONT
|
|
# CONT
|
|
# CONT
|
|
# CONT
|
|
# CONT
|
|
# CONT
|
|
# CONT
|
|
# CONT
|
|
#
|
|
#
|
|
#
|
|
#
|
|
|
|
# THE START IS ACTUALLY RIGHT HERE :O
|
|
# Checks whether conditions are met, otherwise set
|
|
branchcheck, tmpresult = check_branch_conditions(action, fullexecution, self)
|
|
if isinstance(tmpresult, object) or isinstance(tmpresult, list) or isinstance(tmpresult, dict):
|
|
#self.logger.info("[DEBUG] Fixing branch return as object -> string")
|
|
try:
|
|
#tmpresult = tmpresult.replace("'", "\"")
|
|
tmpresult = json.dumps(tmpresult)
|
|
except json.decoder.JSONDecodeError as e:
|
|
self.logger.info(f"[WARNING] Failed condition parsing {tmpresult} to string")
|
|
|
|
# IF branches fail: Exit!
|
|
if not branchcheck:
|
|
self.logger.info("Failed one or more branch conditions.")
|
|
self.action_result["result"] = tmpresult
|
|
self.action_result["status"] = "SKIPPED"
|
|
self.action_result["completed_at"] = int(time.time())
|
|
|
|
self.send_result(self.action_result, headers, stream_path)
|
|
return
|
|
|
|
# Replace name cus there might be issues
|
|
# Not doing lower() as there might be user-made functions
|
|
actionname = action["name"]
|
|
if " " in actionname:
|
|
actionname.replace(" ", "_", -1)
|
|
|
|
#print("ACTION: ", action)
|
|
#print("exec: ", self.full_execution)
|
|
#if action.generated:
|
|
# actionname = actionname.lower()
|
|
|
|
# Runs the actual functions
|
|
try:
|
|
func = getattr(self, actionname, None)
|
|
if func == None:
|
|
self.logger.debug(f"[DEBUG] Failed executing {actionname} because func is None (no function specified).")
|
|
self.action_result["status"] = "FAILURE"
|
|
self.action_result["result"] = json.dumps({
|
|
"success": False,
|
|
"reason": f"Function {actionname} doesn't exist, or the App is out of date.",
|
|
"details": "If this persists, please restart delete the Docker image locally, restart your Orborus instance and then try again to force-download the latest version. Contact support@shuffler.io with this data if the issue persists.",
|
|
})
|
|
elif callable(func):
|
|
try:
|
|
if len(action["parameters"]) < 1:
|
|
#result = await func()
|
|
result = func()
|
|
else:
|
|
# Potentially parse JSON here
|
|
# FIXME - add potential authentication as first parameter(s) here
|
|
# params[parameter["name"]] = parameter["value"]
|
|
#self.logger.info(fullexecution["authentication"]
|
|
# What variables are necessary here tho hmm
|
|
|
|
params = {}
|
|
# This replacement should happen in backend as part of params
|
|
# error log is useless
|
|
#try:
|
|
# for item in action["authentication"]:
|
|
# self.logger.info("AUTH PARAM: ", key, value)
|
|
# #params[item["key"]] = item["value"]
|
|
#except KeyError as e:
|
|
# self.logger.info(f"[WARNING] No authentication specified! Is this correct? err: {e}")
|
|
|
|
# Fixes OpenAPI body parameters for later.
|
|
newparams = []
|
|
counter = -1
|
|
bodyindex = -1
|
|
for parameter in action["parameters"]:
|
|
counter += 1
|
|
|
|
# Hack for key:value in options using ||
|
|
try:
|
|
if parameter["options"] != None and len(parameter["options"]) > 0:
|
|
#self.logger.info(f'OPTIONS: {parameter["options"]}')
|
|
#self.logger.info(f'OPTIONS VAL: {parameter}')
|
|
if "||" in parameter["value"]:
|
|
splitvalue = parameter["value"].split("||")
|
|
if len(splitvalue) > 1:
|
|
#self.logger.info(f'[INFO] Parsed split || options of actions["parameters"]["name"]')
|
|
action["parameters"][counter]["value"] = splitvalue[1]
|
|
|
|
except (IndexError, KeyError, TypeError) as e:
|
|
self.logger.info("[WARNING] Options err: {e}")
|
|
|
|
# This part is purely for OpenAPI accessibility.
|
|
# It replaces the data back into the main item
|
|
# Earlier, we handled each of the items and did later string replacement,
|
|
# but this has changed to do lists within items and such
|
|
if parameter["name"] == "body":
|
|
bodyindex = counter
|
|
#self.logger.info("PARAM: %s" % parameter)
|
|
|
|
# FIXMe: This should also happen after liquid & param parsing..
|
|
try:
|
|
values = parameter["value_replace"]
|
|
if values != None:
|
|
added = 0
|
|
for val in values:
|
|
replace_value = val["value"]
|
|
replace_key = val["key"]
|
|
|
|
if (val["value"].startswith("{") and val["value"].endswith("}")) or (val["value"].startswith("[") and val["value"].endswith("]")):
|
|
self.logger.info(f"""Trying to parse as JSON: {val["value"]}""")
|
|
try:
|
|
newval = val["value"]
|
|
|
|
# If it gets here, remove the "" infront and behind the key as well
|
|
# since this is preventing the JSON from being loaded
|
|
tmpvalue = json.loads(newval)
|
|
replace_key = f"\"{replace_key}\""
|
|
except json.decoder.JSONDecodeError as e:
|
|
self.logger.info("[WARNING] Failed JSON replacement for OpenAPI %s", val["key"])
|
|
|
|
elif val["value"].lower() == "true" or val["value"].lower() == "false":
|
|
replace_key = f"\"{replace_key}\""
|
|
else:
|
|
if "\"" in replace_value and not "\\\"" in replace_value:
|
|
replace_value = replace_value.replace("\"", "\\\"", -1)
|
|
|
|
action["parameters"][counter]["value"] = action["parameters"][counter]["value"].replace(replace_key, replace_value, 1)
|
|
|
|
self.logger.info(f'[INFO] Added param {val["key"]} for body (using OpenAPI)')
|
|
added += 1
|
|
|
|
#action["parameters"]["body"]
|
|
|
|
self.logger.info("ADDED %d parameters for body" % added)
|
|
except KeyError as e:
|
|
self.logger.info("KeyError body OpenAPI: %s" % e)
|
|
pass
|
|
|
|
|
|
self.logger.info(f"""HANDLING BODY: {action["parameters"][counter]["value"]}""")
|
|
action["parameters"][counter]["value"] = recurse_cleanup_script(action["parameters"][counter]["value"])
|
|
|
|
#self.logger.info(action["parameters"])
|
|
|
|
# This seems redundant now
|
|
for parameter in newparams:
|
|
action["parameters"].append(parameter)
|
|
|
|
self.action = action
|
|
|
|
# Setting due to them being overwritten, but still later useful
|
|
try:
|
|
self.original_action = json.loads(json.dumps(action))
|
|
except Exception as e:
|
|
self.logger.info(f"[ERROR] Failed parsing action as JSON to original action. This COULD have bad effects on LOOPED executions: {e}")
|
|
|
|
# calltimes is used to handle forloops in the app itself.
|
|
# 2 kinds of loop - one in gui with one app each, and one like this,
|
|
# which is super fast, but has a bad overview (potentially good tho)
|
|
calltimes = 1
|
|
result = ""
|
|
|
|
all_executions = []
|
|
|
|
# Multi_parameter has the data for each. variable
|
|
minlength = 0
|
|
multi_parameters = json.loads(json.dumps(params))
|
|
multiexecution = False
|
|
multi_execution_lists = []
|
|
remove_params = []
|
|
for parameter in action["parameters"]:
|
|
check, value, is_loop = parse_params(action, fullexecution, parameter, self)
|
|
if check:
|
|
raise Exception(json.dumps({
|
|
"success": False,
|
|
"reason": "Parameter {parameter} has an issue",
|
|
"exception": f"Value Error: {check}",
|
|
}))
|
|
|
|
if parameter["name"] == "body":
|
|
self.logger.info(f"[INFO] Should debug field with liquid and other checks as it's BODY: {value}")
|
|
|
|
# Custom format for ${name[0,1,2,...]}$
|
|
#submatch = "([${]{2}([0-9a-zA-Z_-]+)(\[.*\])[}$]{2})"
|
|
#self.logger.info(f"Returnedvalue: {value}")
|
|
# OLD: Used until 13.03.2021: submatch = "([${]{2}#?([0-9a-zA-Z_-]+)#?(\[.*\])[}$]{2})"
|
|
# \${[0-9a-zA-Z_-]+#?(\[.*?]}\$)
|
|
submatch = "([${]{2}#?([0-9a-zA-Z_-]+)#?(\[.*?]}\$))"
|
|
actualitem = re.findall(submatch, value, re.MULTILINE)
|
|
try:
|
|
if action["skip_multicheck"]:
|
|
self.logger.info("Skipping multicheck")
|
|
actualitem = []
|
|
except KeyError:
|
|
pass
|
|
|
|
#self.logger.info("Return value: %s" % value)
|
|
actionname = action["name"]
|
|
#self.logger.info("Multicheck ", actualitem)
|
|
#self.logger.info("ITEM LENGTH: %d, Actual item: %s" % (len(actualitem), actualitem))
|
|
if len(actualitem) > 0:
|
|
multiexecution = True
|
|
|
|
# Loop WITHOUT JSON variables go here.
|
|
# Loop WITH variables go in else.
|
|
self.logger.info("Before first part in multiexec!")
|
|
handled = False
|
|
|
|
# Has a loop without a variable used inside
|
|
if len(actualitem[0]) > 2 and actualitem[0][1] == "SHUFFLE_NO_SPLITTER":
|
|
|
|
self.logger.info("(1) Pre replacement: %s" % actualitem[0][2])
|
|
tmpitem = value
|
|
|
|
index = 0
|
|
replacement = actualitem[index][2]
|
|
if replacement.endswith("}$"):
|
|
replacement = replacement[:-2]
|
|
|
|
if replacement.startswith("\"") and replacement.endswith("\""):
|
|
replacement = replacement[1:len(replacement)-1]
|
|
|
|
self.logger.info("POST replacement: %s" % replacement)
|
|
|
|
#json_replacement = tmpitem.replace(actualitem[index][0], replacement, 1)
|
|
#self.logger.info("AFTER POST replacement: %s" % json_replacement)
|
|
json_replacement = replacement
|
|
try:
|
|
json_replacement = json.loads(replacement)
|
|
except json.decoder.JSONDecodeError as e:
|
|
try:
|
|
replacement = replacement.replace("\'", "\"", -1)
|
|
json_replacement = json.loads(replacement)
|
|
except:
|
|
self.logger.info("JSON error singular: %s" % e)
|
|
|
|
if len(json_replacement) > minlength:
|
|
minlength = len(json_replacement)
|
|
|
|
self.logger.info("PRE new_replacement")
|
|
|
|
new_replacement = []
|
|
for i in range(len(json_replacement)):
|
|
if isinstance(json_replacement[i], dict) or isinstance(json_replacement[i], list):
|
|
tmp_replacer = json.dumps(json_replacement[i])
|
|
newvalue = tmpitem.replace(str(actualitem[index][0]), str(tmp_replacer), 1)
|
|
else:
|
|
newvalue = tmpitem.replace(str(actualitem[index][0]), str(json_replacement[i]), 1)
|
|
|
|
try:
|
|
newvalue = parse_liquid(newvalue, self)
|
|
except Exception as e:
|
|
self.logger.info(f"[WARNING] Failed liquid parsing in loop (2): {e}")
|
|
|
|
try:
|
|
newvalue = json.loads(newvalue)
|
|
except json.decoder.JSONDecodeError as e:
|
|
self.logger.info("DECODER ERROR: %s" % e)
|
|
pass
|
|
|
|
new_replacement.append(newvalue)
|
|
|
|
self.logger.info("New replacement: %s" % new_replacement)
|
|
|
|
# FIXME: Should this use new_replacement?
|
|
tmpitem = tmpitem.replace(actualitem[index][0], replacement, 1)
|
|
|
|
# This code handles files.
|
|
resultarray = []
|
|
isfile = False
|
|
try:
|
|
self.logger.info("(1) ------------ PARAM: %s" % parameter["schema"]["type"])
|
|
if parameter["schema"]["type"] == "file" and len(value) > 0:
|
|
self.logger.info("(1) SHOULD HANDLE FILE IN MULTI. Get based on value %s" % tmpitem)
|
|
# This is silly :)
|
|
# Q: Is there something wrong with the download system?
|
|
# It seems to return "FILE CONTENT: %s" with the ID as %s
|
|
for tmp_file_split in json.loads(tmpitem):
|
|
self.logger.info("(1) PRE GET FILE %s" % tmp_file_split)
|
|
file_value = self.get_file(tmp_file_split)
|
|
self.logger.info("(1) POST AWAIT %s" % file_value)
|
|
resultarray.append(file_value)
|
|
self.logger.info("(1) FILE VALUE FOR VAL %s: %s" % (tmp_file_split, file_value))
|
|
|
|
isfile = True
|
|
except NameError as e:
|
|
self.logger.info("(1) SCHEMA NAMEERROR IN FILE HANDLING: %s" % e)
|
|
except KeyError as e:
|
|
self.logger.info("(1) SCHEMA KEYERROR IN FILE HANDLING: %s" % e)
|
|
except json.decoder.JSONDecodeError as e:
|
|
self.logger.info("(1) JSON ERROR IN FILE HANDLING: %s" % e)
|
|
|
|
if not isfile:
|
|
self.logger.info("Resultarray (NOT FILE): %s" % resultarray)
|
|
params[parameter["name"]] = tmpitem
|
|
multi_parameters[parameter["name"]] = new_replacement
|
|
else:
|
|
self.logger.info("Resultarray (FILE): %s" % resultarray)
|
|
params[parameter["name"]] = resultarray
|
|
multi_parameters[parameter["name"]] = resultarray
|
|
|
|
#if len(resultarray) == 0:
|
|
# self.logger.info("[WARNING] Returning empty array because the array length to be looped is 0 (1)")
|
|
# action_result["status"] = "SUCCESS"
|
|
# action_result["result"] = "[]"
|
|
# self.send_result(action_result, headers, stream_path)
|
|
# return
|
|
|
|
multi_execution_lists.append(new_replacement)
|
|
#self.logger.info("MULTI finished: %s" % json_replacement)
|
|
else:
|
|
self.logger.info(f"(2) Pre replacement (loop with variables). Variables: {actualitem}") #% actualitem)
|
|
# This is here to handle for loops within variables.. kindof
|
|
# 1. Find the length of the longest array
|
|
# 2. Build an array with the base values based on parameter["value"]
|
|
# 3. Get the n'th value of the generated list from values
|
|
# 4. Execute all n answers
|
|
replacements = {}
|
|
curminlength = 0
|
|
for replace in actualitem:
|
|
try:
|
|
to_be_replaced = replace[0]
|
|
actualitem = replace[2]
|
|
if actualitem.endswith("}$"):
|
|
actualitem = actualitem[:-2]
|
|
|
|
except IndexError:
|
|
self.logger.info("[WARNING] Indexerror")
|
|
continue
|
|
|
|
#self.logger.info(f"\n\nTMPITEM: {actualitem}\n\n")
|
|
#actualitem = parse_wrapper_start(actualitem)
|
|
#self.logger.info(f"\n\nTMPITEM2: {actualitem}\n\n")
|
|
|
|
try:
|
|
itemlist = json.loads(actualitem)
|
|
if len(itemlist) > minlength:
|
|
minlength = len(itemlist)
|
|
|
|
if len(itemlist) > curminlength:
|
|
curminlength = len(itemlist)
|
|
|
|
except json.decoder.JSONDecodeError as e:
|
|
self.logger.info("JSON Error (replace): %s in %s" % (e, actualitem))
|
|
|
|
replacements[to_be_replaced] = actualitem
|
|
|
|
|
|
# Parses the data as string with length, split etc. before moving on.
|
|
|
|
|
|
#self.logger.info("In second part of else: %s" % (len(itemlist)))
|
|
# This is a result array for JUST this value..
|
|
# What if there are more?
|
|
resultarray = []
|
|
for i in range(0, curminlength):
|
|
tmpitem = json.loads(json.dumps(parameter["value"]))
|
|
for key, value in replacements.items():
|
|
replacement = value
|
|
try:
|
|
replacement = json.dumps(json.loads(value)[i])
|
|
except IndexError as e:
|
|
self.logger.info(f"[ERROR] Failed handling value parsing with index: {e}")
|
|
pass
|
|
|
|
if replacement.startswith("\"") and replacement.endswith("\""):
|
|
replacement = replacement[1:len(replacement)-1]
|
|
#except json.decoder.JSONDecodeError as e:
|
|
|
|
#self.logger.info("REPLACING %s with %s" % (key, replacement))
|
|
#replacement = parse_wrapper_start(replacement)
|
|
tmpitem = tmpitem.replace(key, replacement, -1)
|
|
try:
|
|
tmpitem = parse_liquid(tmpitem, self)
|
|
except Exception as e:
|
|
self.logger.info(f"[WARNING] Failed liquid parsing in loop (2): {e}")
|
|
|
|
|
|
# This code handles files.
|
|
self.logger.info("(2) ------------ PARAM: %s" % parameter["schema"]["type"])
|
|
isfile = False
|
|
try:
|
|
if parameter["schema"]["type"] == "file" and len(value) > 0:
|
|
self.logger.info("(2) SHOULD HANDLE FILE IN MULTI. Get based on value %s" % parameter["value"])
|
|
|
|
for tmp_file_split in json.loads(parameter["value"]):
|
|
self.logger.info("(2) PRE GET FILE %s" % tmp_file_split)
|
|
file_value = self.get_file(tmp_file_split)
|
|
self.logger.info("(2) POST AWAIT %s" % file_value)
|
|
resultarray.append(file_value)
|
|
self.logger.info("(2) FILE VALUE FOR VAL %s: %s" % (tmp_file_split, file_value))
|
|
|
|
|
|
isfile = True
|
|
except KeyError as e:
|
|
self.logger.info("(2) SCHEMA ERROR IN FILE HANDLING: %s" % e)
|
|
except json.decoder.JSONDecodeError as e:
|
|
self.logger.info("(2) JSON ERROR IN FILE HANDLING: %s" % e)
|
|
|
|
if not isfile:
|
|
tmpitem = tmpitem.replace("\\\\", "\\", -1)
|
|
resultarray.append(tmpitem)
|
|
|
|
# With this parameter ready, add it to... a greater list of parameters. Rofl
|
|
self.logger.info("LENGTH OF ARR: %d" % len(resultarray))
|
|
if len(resultarray) == 0:
|
|
self.logger.info("[WARNING] Returning empty array because the array length to be looped is 0 (0)")
|
|
self.action_result["status"] = "SUCCESS"
|
|
self.action_result["result"] = "[]"
|
|
self.send_result(self.action_result, headers, stream_path)
|
|
return
|
|
|
|
#self.logger.info("RESULTARRAY: %s" % resultarray)
|
|
if resultarray not in multi_execution_lists:
|
|
multi_execution_lists.append(resultarray)
|
|
|
|
multi_parameters[parameter["name"]] = resultarray
|
|
else:
|
|
# Parses things like int(value)
|
|
#self.logger.info("[DEBUG] Normal parsing (not looping)")#with data %s" % value)
|
|
# This part has fucked over so many random JSON usages because of weird paranthesis parsing
|
|
|
|
value = parse_wrapper_start(value, self)
|
|
|
|
try:
|
|
if str(value).startswith("b'") and str(value).endswith("'"):
|
|
value = value[2:-1]
|
|
except Exception as e:
|
|
print(f"Value rawbytes Exception: {e}")
|
|
|
|
params[parameter["name"]] = value
|
|
multi_parameters[parameter["name"]] = value
|
|
|
|
# This code handles files.
|
|
try:
|
|
if parameter["schema"]["type"] == "file" and len(value) > 0:
|
|
self.logger.info("\n SHOULD HANDLE FILE. Get based on value %s. <--- is this a valid ID?" % parameter["value"])
|
|
file_value = self.get_file(value)
|
|
self.logger.info("FILE VALUE: %s \n" % file_value)
|
|
|
|
params[parameter["name"]] = file_value
|
|
multi_parameters[parameter["name"]] = file_value
|
|
except KeyError as e:
|
|
self.logger.info("SCHEMA ERROR IN FILE HANDLING: %s" % e)
|
|
|
|
|
|
#remove_params.append(parameter["name"])
|
|
# Fix lists here
|
|
# FIXME: This doesn't really do anything anymore
|
|
self.logger.info("[DEBUG] CHECKING multi execution list: %d!" % len(multi_execution_lists))
|
|
if len(multi_execution_lists) > 0:
|
|
self.logger.info("\n [DEBUG] Multi execution list has more data: %d" % len(multi_execution_lists))
|
|
filteredlist = []
|
|
for listitem in multi_execution_lists:
|
|
if listitem in filteredlist:
|
|
continue
|
|
|
|
# FIXME: Subsub required?. Recursion!
|
|
# Basically multiply what we have with the outer loop?
|
|
#
|
|
#if isinstance(listitem, list):
|
|
# for subitem in listitem:
|
|
# filteredlist.append(subitem)
|
|
#else:
|
|
# filteredlist.append(listitem)
|
|
|
|
#self.logger.info("New list length: %d" % len(filteredlist))
|
|
if len(filteredlist) > 1:
|
|
self.logger.info(f"Calculating new multi-loop length with {len(filteredlist)} lists")
|
|
tmplength = 1
|
|
for innerlist in filteredlist:
|
|
tmplength = len(innerlist)*tmplength
|
|
self.logger.info("List length: %d. %d*%d" % (tmplength, len(innerlist), tmplength))
|
|
|
|
minlength = tmplength
|
|
|
|
self.logger.info("New multi execution length: %d\n" % tmplength)
|
|
|
|
# Cleaning up extra list params
|
|
for subparam in remove_params:
|
|
#self.logger.info(f"DELETING {subparam}")
|
|
try:
|
|
del params[subparam]
|
|
except:
|
|
pass
|
|
#self.logger.info(f"Error with subparam deletion of {subparam} in {params}")
|
|
try:
|
|
del multi_parameters[subparam]
|
|
except:
|
|
#self.logger.info(f"Error with subparam deletion of {subparam} in {multi_parameters} (2)")
|
|
pass
|
|
|
|
#self.logger.info()
|
|
#self.logger.info(f"Param: {params}")
|
|
#self.logger.info(f"Multiparams: {multi_parameters}")
|
|
#self.logger.info()
|
|
|
|
if not multiexecution:
|
|
# Runs a single iteration here
|
|
new_params = self.validate_unique_fields(params)
|
|
self.logger.info(f"[DEBUG] Returned with newparams of length {len(new_params)}")
|
|
if isinstance(new_params, list) and len(new_params) == 1:
|
|
params = new_params[0]
|
|
else:
|
|
self.logger.info("[WARNING] SHOULD STOP EXECUTION BECAUSE FIELDS AREN'T UNIQUE")
|
|
self.action_result["status"] = "SKIPPED"
|
|
self.action_result["result"] = f"A non-unique value was found"
|
|
self.action_result["completed_at"] = int(time.time())
|
|
self.send_result(self.action_result, headers, stream_path)
|
|
return
|
|
|
|
self.logger.info("[INFO] Running normal execution (not loop)\n\n")
|
|
|
|
# Added literal evaluation of anything resembling a string
|
|
# The goal is to parse objects that e.g. use single quotes and the like
|
|
# FIXME: add this to Multi exec as well.
|
|
try:
|
|
for key, value in params.items():
|
|
if "-" in key:
|
|
try:
|
|
newkey = key.replace("-", "_", -1).lower()
|
|
params[newkey] = params[key]
|
|
except Exception as e:
|
|
self.logger.info("[DEBUG] Failed updating key with dash in it: %s" % e)
|
|
|
|
try:
|
|
if isinstance(value, str) and ((value.startswith("{") and value.endswith("}")) or (value.startswith("[") and value.endswith("]"))):
|
|
params[key] = json.loads(value)
|
|
except Exception as e:
|
|
try:
|
|
if isinstance(value, str) and ((value.startswith("{") and value.endswith("}")) or (value.startswith("[") and value.endswith("]"))):
|
|
params[key] = ast.literal_eval(value)
|
|
except Exception as e:
|
|
self.logger.info(f"[DEBUG] Failed parsing value with ast and json.loads - noncritical. Trying next: {e}")
|
|
continue
|
|
except Exception as e:
|
|
self.logger.info("[DEBUG] Failed looping objects. Non critical: {e}")
|
|
|
|
# Uncomment below to get the param input
|
|
# self.logger.info(f"[DEBUG] PARAMS: {params}")
|
|
|
|
#newres = ""
|
|
iteration_count = 0
|
|
found_error = ""
|
|
while True:
|
|
iteration_count += 1
|
|
if iteration_count >= 10:
|
|
newres = {
|
|
"success": False,
|
|
"reason": "Iteration count more than 10. This happens if the input to the action is wrong. Try remaking the action, and contact support@shuffler.io if this persists.",
|
|
"details": found_error,
|
|
}
|
|
break
|
|
|
|
try:
|
|
#try:
|
|
# Individual functions shouldn't take longer than this
|
|
# This is an attempt to make timeouts occur less, incentivizing users to make use efficient API's
|
|
# PS: Not implemented for lists - only single actions as of May 2023
|
|
timeout = 30
|
|
|
|
# Check if current app is Shuffle Tools, then set to 55 due to certain actions being slow (ioc parser..)
|
|
#uu In general, this should be disabled for onprem
|
|
if self.action["app_name"].lower() == "shuffle tools":
|
|
timeout = 55
|
|
|
|
timeout_env = os.getenv("SHUFFLE_APP_SDK_TIMEOUT", timeout)
|
|
try:
|
|
timeout = int(timeout_env)
|
|
except Exception as e:
|
|
self.logger.info(f"[WARNING] Failed parsing timeout to int: {e}")
|
|
|
|
#timeout = 30
|
|
|
|
try:
|
|
executor = concurrent.futures.ThreadPoolExecutor()
|
|
future = executor.submit(func, **params)
|
|
newres = future.result(timeout)
|
|
|
|
if not future.done():
|
|
# The future is still running, so we need to cancel it
|
|
future.cancel()
|
|
newres = json.dumps({
|
|
"success": False,
|
|
"reason": "Timeout error within %d seconds. This happens if we can't reach or use the API you're trying to use within the time limit." % timeout,
|
|
"exception": str(e),
|
|
})
|
|
|
|
else:
|
|
# The future is done, so we can just get the result from newres :)
|
|
#newres = future.result()
|
|
#print("Future is done!")
|
|
pass
|
|
|
|
except concurrent.futures.TimeoutError as e:
|
|
newres = json.dumps({
|
|
"success": False,
|
|
"reason": "Timeout error within %d seconds (2). This happens if we can't reach or use the API you're trying to use within the time limit" % timeout
|
|
})
|
|
|
|
break
|
|
|
|
|
|
|
|
#thread = threading.Thread(target=func, args=(**params,))
|
|
#thread.start()
|
|
|
|
#thread.join(timeout)
|
|
|
|
#if thread.is_alive():
|
|
# # The thread is still running, so we need to stop it
|
|
# # You can handle this as needed, such as raising an exception
|
|
# timeout_handler()
|
|
|
|
|
|
#with Timeout(timeout):
|
|
# newres = func(**params)
|
|
# break
|
|
#except Timeout.Timeout as e:
|
|
# self.logger.info(f"[DEBUG] Timeout error: {e}")
|
|
# newres = json.dumps({
|
|
# "success": False,
|
|
# "reason": "Timeout error within %d seconds. This typically happens if we can't reach the API you're trying to reach." % timeout,
|
|
# "exception": str(e),
|
|
# })
|
|
|
|
# break
|
|
|
|
except TypeError as e:
|
|
newres = ""
|
|
self.logger.info(f"[DEBUG] Got exec type error: {e}")
|
|
try:
|
|
e = json.loads(f"{e}")
|
|
except:
|
|
e = f"{e}"
|
|
|
|
found_error = e
|
|
errorstring = f"{e}"
|
|
|
|
if "the JSON object must be" in errorstring:
|
|
self.logger.info("[ERROR] Something is wrong with the input for this function. Are lists and JSON data handled parsed properly (0)? the JSON object must be in...")
|
|
|
|
newres = json.dumps({
|
|
"success": False,
|
|
"reason": "An exception occurred while running this function (1). See exception for more details and contact support if this persists (support@shuffler.io)",
|
|
"exception": f"{type(e).__name__} - {e}",
|
|
})
|
|
break
|
|
elif "got an unexpected keyword argument" in errorstring:
|
|
fieldsplit = errorstring.split("'")
|
|
if len(fieldsplit) > 1:
|
|
field = fieldsplit[1]
|
|
|
|
try:
|
|
del params[field]
|
|
self.logger.info("[WARNING] Removed invalid field %s (2)" % field)
|
|
except KeyError:
|
|
break
|
|
else:
|
|
newres = json.dumps({
|
|
"success": False,
|
|
"reason": "You may be running an old version of this action. Try remaking the node, then contact us at support@shuffler.io if it doesn't work with all these details.",
|
|
"exception": f"TypeError: {e}",
|
|
})
|
|
break
|
|
except Exception as e:
|
|
self.logger.info(f"[ERROR] Something is wrong with the input for this function. Are lists and JSON data handled parsed properly (1)? err: {e}")
|
|
|
|
#try:
|
|
# e = json.loads(f"{e}")
|
|
#except:
|
|
# e = f"{e}"
|
|
|
|
newres = json.dumps({
|
|
"success": False,
|
|
"reason": "An exception occurred while running this function (2). See exception for more details and contact support if this persists (support@shuffler.io)",
|
|
"exception": f"{type(e).__name__} - {e}",
|
|
|
|
})
|
|
break
|
|
|
|
# Forcing async wait in case of old apps that use async (backwards compatibility)
|
|
try:
|
|
if asyncio.iscoroutine(newres):
|
|
self.logger.info("[DEBUG] In coroutine (1)")
|
|
async def parse_value(newres):
|
|
value = await asyncio.gather(
|
|
newres
|
|
)
|
|
|
|
return value[0]
|
|
|
|
newres = asyncio.run(parse_value(newres))
|
|
else:
|
|
#self.logger.info("[DEBUG] Not in coroutine (1)")
|
|
pass
|
|
except Exception as e:
|
|
self.logger.warning("[ERROR] Failed to parse coroutine value for old app: {e}")
|
|
|
|
self.logger.info("\n\n\n[INFO] Returned from execution with type(s) %s" % type(newres))
|
|
#self.logger.info("\n[INFO] Returned from execution with %s of types %s" % (newres, type(newres)))#, newres)
|
|
if isinstance(newres, tuple):
|
|
self.logger.info(f"[INFO] Handling return as tuple: {newres}")
|
|
# Handles files.
|
|
filedata = ""
|
|
file_ids = []
|
|
self.logger.info("TUPLE: %s" % newres[1])
|
|
if isinstance(newres[1], list):
|
|
self.logger.info("[INFO] HANDLING LIST FROM RET")
|
|
file_ids = self.set_files(newres[1])
|
|
elif isinstance(newres[1], object):
|
|
self.logger.info("[INFO] Handling JSON from ret")
|
|
file_ids = self.set_files([newres[1]])
|
|
elif isinstance(newres[1], str):
|
|
self.logger.info("[INFO] Handling STRING from ret")
|
|
file_ids = self.set_files([newres[1]])
|
|
else:
|
|
self.logger.info("[INFO] NO FILES TO HANDLE")
|
|
|
|
tmp_result = {
|
|
"success": True,
|
|
"result": newres[0],
|
|
"file_ids": file_ids
|
|
}
|
|
|
|
result = json.dumps(tmp_result)
|
|
elif isinstance(newres, str):
|
|
self.logger.info("[INFO] Handling return as string of length %d" % len(newres))
|
|
result += newres
|
|
elif isinstance(newres, dict) or isinstance(newres, list):
|
|
try:
|
|
result += json.dumps(newres, indent=4)
|
|
except json.JSONDecodeError as e:
|
|
self.logger.info("[WARNING] Failed decoding result: %s" % e)
|
|
try:
|
|
result += str(newres)
|
|
except ValueError:
|
|
result += "Failed autocasting. Can't handle %s type from function. Must be string" % type(newres)
|
|
self.logger.info("Can't handle type %s value from function" % (type(newres)))
|
|
except Exception as e:
|
|
self.logger.info("[ERROR] Failed to json dump. Returning as string.")
|
|
result += str(newres)
|
|
else:
|
|
try:
|
|
result += str(newres)
|
|
except ValueError:
|
|
result += "Failed autocasting. Can't handle %s type from function. Must be string" % type(newres)
|
|
self.logger.info("Can't handle type %s value from function" % (type(newres)))
|
|
|
|
#self.logger.info("[INFO] POST NEWRES RESULT!")#, result)
|
|
else:
|
|
#self.logger.info("[INFO] APP_SDK DONE: Starting MULTI execution (length: %d) with values %s" % (minlength, multi_parameters))
|
|
# 1. Use number of executions based on the arrays being similar
|
|
# 2. Find the right value from the parsed multi_params
|
|
|
|
self.logger.info("[INFO] Running WITHOUT outer loop (looping)")
|
|
json_object = False
|
|
#results = await self.run_recursed_items(func, multi_parameters, {})
|
|
results = self.run_recursed_items(func, multi_parameters, {})
|
|
if isinstance(results, dict) or isinstance(results, list):
|
|
json_object = True
|
|
|
|
# Dump the result as a string of a list
|
|
#self.logger.info("RESULTS: %s" % results)
|
|
if isinstance(results, list) or isinstance(results, dict):
|
|
self.logger.info(f"JSON OBJECT? {json_object}")
|
|
|
|
# This part is weird lol
|
|
if json_object:
|
|
try:
|
|
result = json.dumps(results)
|
|
except json.JSONDecodeError as e:
|
|
self.logger.info(f"Failed to decode: {e}")
|
|
result = results
|
|
else:
|
|
result = "["
|
|
for item in results:
|
|
try:
|
|
json.loads(item)
|
|
result += item
|
|
except json.decoder.JSONDecodeError as e:
|
|
# Common nested issue which puts " around everything
|
|
self.logger.info("Decodingerror: %s" % e)
|
|
try:
|
|
tmpitem = item.replace("\\\"", "\"", -1)
|
|
json.loads(tmpitem)
|
|
result += tmpitem
|
|
|
|
except:
|
|
result += "\"%s\"" % item
|
|
|
|
result += ", "
|
|
|
|
result = result[:-2]
|
|
result += "]"
|
|
else:
|
|
self.logger.info("Normal result - no list?")
|
|
result = results
|
|
|
|
self.action_result["status"] = "SUCCESS"
|
|
self.action_result["result"] = str(result)
|
|
if self.action_result["result"] == "":
|
|
self.action_result["result"] = result
|
|
|
|
self.logger.debug(f"[DEBUG] Executed {action['label']}-{action['id']}")#with result: {result}")
|
|
#self.logger.debug(f"Data: %s" % action_result)
|
|
except TypeError as e:
|
|
self.logger.info("[ERROR] TypeError issue: %s" % e)
|
|
self.action_result["status"] = "FAILURE"
|
|
self.action_result["result"] = json.dumps({
|
|
"success": False,
|
|
"reason": f"Typeerror. Most likely due to a list that should've been a string. See details for more info.",
|
|
"details": e,
|
|
})
|
|
#self.action_result["result"] = "TypeError: %s" % str(e)
|
|
else:
|
|
self.logger.info("[DEBUG] Function %s doesn't exist?" % action["name"])
|
|
self.logger.error(f"[ERROR] App {self.__class__.__name__}.{action['name']} is not callable")
|
|
self.action_result["status"] = "FAILURE"
|
|
#self.action_result["result"] = "Function %s is not callable." % actionname
|
|
|
|
self.action_result["result"] = json.dumps({
|
|
"success": False,
|
|
"reason": f"Function %s doesn't exist." % actionname,
|
|
})
|
|
|
|
# https://ptb.discord.com/channels/747075026288902237/882017498550112286/882043773138382890
|
|
except (requests.exceptions.RequestException, TimeoutError) as e:
|
|
self.logger.info(f"[ERROR] Failed to execute request (requests): {e}")
|
|
self.logger.exception(f"[ERROR] Failed to execute {e}-{action['id']}")
|
|
self.action_result["status"] = "SUCCESS"
|
|
try:
|
|
e = json.loads(f"{e}")
|
|
except:
|
|
e = f"{e}"
|
|
|
|
try:
|
|
self.action_result["result"] = json.dumps({
|
|
"success": False,
|
|
"reason": f"Request error - failing silently. Details in detail section",
|
|
"details": e,
|
|
})
|
|
except json.decoder.JSONDecodeError as e:
|
|
self.action_result["result"] = f"Request error: {e}"
|
|
|
|
except Exception as e:
|
|
self.logger.info(f"[ERROR] Failed to execute: {e}")
|
|
self.logger.exception(f"[ERROR] Failed to execute {e}-{action['id']}")
|
|
self.action_result["status"] = "FAILURE"
|
|
try:
|
|
e = json.loads(f"{e}")
|
|
except:
|
|
e = f"{e}"
|
|
|
|
self.action_result["result"] = json.dumps({
|
|
"success": False,
|
|
"reason": f"General exception in the app. See shuffle action logs for more details.",
|
|
"details": e,
|
|
})
|
|
|
|
# Send the result :)
|
|
self.action_result["completed_at"] = int(time.time())
|
|
self.send_result(self.action_result, headers, stream_path)
|
|
|
|
#try:
|
|
# try:
|
|
# self.log_capture_string.flush()
|
|
# except Exception as e:
|
|
# print(f"[WARNING] Failed to flush logs (2): {e}")
|
|
# pass
|
|
|
|
# self.log_capture_string.close()
|
|
#except:
|
|
# print(f"[WARNING] Failed to close logs (2): {e}")
|
|
|
|
return
|
|
|
|
@classmethod
|
|
def run(cls, action=""):
|
|
logging.basicConfig(format="{asctime} - {name} - {levelname}:{message}", style='{')
|
|
logger = logging.getLogger(f"{cls.__name__}")
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
logger.info("[DEBUG] Normal execution.")
|
|
|
|
##############################################
|
|
|
|
exposed_port = os.getenv("SHUFFLE_APP_EXPOSED_PORT", "")
|
|
logger.info(f"[DEBUG] \"{runtime}\" - run indicates microservices. Port: \"{exposed_port}\"")
|
|
if runtime == "run" and exposed_port != "":
|
|
# Base port is 33334. Exposed port may differ based on discovery from Worker
|
|
port = int(exposed_port)
|
|
logger.info(f"[DEBUG] Starting webserver on port {port} (same as exposed port)")
|
|
from flask import Flask, request
|
|
from waitress import serve
|
|
|
|
flask_app = Flask(__name__)
|
|
#flask_app.config['PERMANENT_SESSION_LIFETIME'] = datetime.timedelta(minutes=5)
|
|
|
|
#async def execute():
|
|
@flask_app.route("/api/v1/health", methods=["GET", "POST"])
|
|
def check_health():
|
|
return "OK"
|
|
|
|
@flask_app.route("/api/v1/run", methods=["POST"])
|
|
def execute():
|
|
if request.method == "POST":
|
|
#print(request.get_json(force=True))
|
|
requestdata = {}
|
|
try:
|
|
requestdata = json.loads(request.data)
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"reason": f"Invalid Action data {e}",
|
|
}
|
|
|
|
#logger.info(f"[DEBUG] Datatype: {type(requestdata)}: {requestdata}")
|
|
|
|
# Remaking class for each request
|
|
|
|
app = cls(redis=None, logger=logger, console_logger=logger)
|
|
extra_info = ""
|
|
try:
|
|
#asyncio.run(AppBase.run(action=requestdata), debug=True)
|
|
#value = json.dumps(value)
|
|
try:
|
|
app.full_execution = json.dumps(requestdata["workflow_execution"])
|
|
except Exception as e:
|
|
logger.info(f"[ERROR] Failed parsing full execution from workflow_execution: {e}")
|
|
extra_info += f"\n{e}"
|
|
|
|
try:
|
|
app.action = requestdata["action"]
|
|
except Exception as e:
|
|
logger.info(f"[ERROR] Failed parsing action: {e}")
|
|
extra_info += f"\n{e}"
|
|
|
|
try:
|
|
app.authorization = requestdata["authorization"]
|
|
app.current_execution_id = requestdata["execution_id"]
|
|
except Exception as e:
|
|
logger.info(f"[ERROR] Failed parsing auth and exec id: {e}")
|
|
extra_info += f"\n{e}"
|
|
|
|
# BASE URL (backend)
|
|
try:
|
|
app.url = requestdata["url"]
|
|
logger.info(f"BACKEND URL (url): {app.url}")
|
|
except Exception as e:
|
|
logger.info(f"[ERROR] Failed parsing url (backend): {e}")
|
|
extra_info += f"\n{e}"
|
|
|
|
# URL (worker)
|
|
try:
|
|
app.base_url = requestdata["base_url"]
|
|
logger.info(f"WORKER URL (base url): {app.base_url}")
|
|
except Exception as e:
|
|
logger.info(f"[ERROR] Failed parsing base url (worker): {e}")
|
|
extra_info += f"\n{e}"
|
|
|
|
#await
|
|
app.execute_action(app.action)
|
|
logger.info("[DEBUG] Done awaiting app action running")
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"reason": f"Problem in execution {e}",
|
|
"execution_issues": extra_info,
|
|
}
|
|
|
|
return {
|
|
"success": True,
|
|
"reason": "App successfully finished",
|
|
"execution_issues": extra_info,
|
|
}
|
|
else:
|
|
return {
|
|
"success": False,
|
|
"reason": f"HTTP method {request.method} not allowed",
|
|
}
|
|
|
|
logger.info(f"[DEBUG] Serving on port {port}")
|
|
|
|
#flask_app.run(
|
|
# host="0.0.0.0",
|
|
# port=port,
|
|
# threaded=True,
|
|
# processes=1,
|
|
# debug=False,
|
|
#)
|
|
|
|
serve(
|
|
flask_app,
|
|
host="0.0.0.0",
|
|
port=port,
|
|
threads=8,
|
|
channel_timeout=30,
|
|
expose_tracebacks=True,
|
|
asyncore_use_poll=True,
|
|
)
|
|
#######################
|
|
else:
|
|
# Has to start like this due to imports in other apps
|
|
# Move it outside everything?
|
|
app = cls(redis=None, logger=logger, console_logger=logger)
|
|
#logger.info(f"[DEBUG] Action: {action}")
|
|
|
|
if isinstance(action, str):
|
|
logger.info("[DEBUG] Normal execution (env var). Action is a string.")
|
|
elif isinstance(action, object):
|
|
logger.info("[DEBUG] OBJECT execution (cloud). Action is NOT a string.")
|
|
app.action = action
|
|
|
|
try:
|
|
app.authorization = action["authorization"]
|
|
app.current_execution_id = action["execution_id"]
|
|
except:
|
|
pass
|
|
|
|
# BASE URL (worker)
|
|
try:
|
|
app.url = action["url"]
|
|
except:
|
|
pass
|
|
|
|
# Callback URL (backend)
|
|
try:
|
|
app.base_url = action["base_url"]
|
|
except:
|
|
pass
|
|
else:
|
|
self.logger.info("ACTION TYPE (unhandled): %s" % type(action))
|
|
|
|
app.execute_action(app.action)
|
|
|
|
if __name__ == "__main__":
|
|
AppBase.run()
|