hcornet 506716e703
Some checks failed
Deployment Verification / deploy-and-test (push) Failing after 29s
first sync
2025-03-04 07:59:21 +01:00

2658 lines
91 KiB
Go

package main
import (
"github.com/shuffle/shuffle-shared"
//"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
//"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/mount"
dockerclient "github.com/docker/docker/client"
//"github.com/go-git/go-billy/v5/memfs"
//newdockerclient "github.com/fsouza/go-dockerclient"
//"github.com/satori/go.uuid"
"github.com/gorilla/mux"
"github.com/patrickmn/go-cache"
"github.com/satori/go.uuid"
// No necessary outside shared
"cloud.google.com/go/datastore"
"cloud.google.com/go/storage"
//k8s deps
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
// "k8s.io/client-go/util/retry"
)
// This is getting out of hand :)
var environment = os.Getenv("ENVIRONMENT_NAME")
var baseUrl = os.Getenv("BASE_URL")
var appCallbackUrl = os.Getenv("BASE_URL")
var cleanupEnv = strings.ToLower(os.Getenv("CLEANUP"))
var dockerApiVersion = strings.ToLower(os.Getenv("DOCKER_API_VERSION"))
var swarmNetworkName = os.Getenv("SHUFFLE_SWARM_NETWORK_NAME")
var timezone = os.Getenv("TZ")
var baseimagename = "frikky/shuffle"
// var baseimagename = "registry.hub.docker.com/frikky/shuffle"
var registryName = "registry.hub.docker.com"
var sleepTime = 2
var requestCache *cache.Cache
var topClient *http.Client
var data string
var requestsSent = 0
var appsInitialized = false
var hostname string
/*
var environments []string
var parents map[string][]string
var children map[string][]string
var visited []string
var executed []string
var nextActions []string
var extra int
var startAction string
*/
//var results []shuffle.ActionResult
//var allLogs map[string]string
//var containerIds []string
var downloadedImages []string
// Images to be autodeployed in the latest version of Shuffle.
var autoDeploy = map[string]string{
"http:1.3.0": "frikky/shuffle:http_1.3.0",
"http:1.4.0": "frikky/shuffle:http_1.4.0",
"shuffle-tools:1.2.0": "frikky/shuffle:shuffle-tools_1.2.0",
"shuffle-subflow:1.0.0": "frikky/shuffle:shuffle-subflow_1.0.0",
"shuffle-subflow:1.1.0": "frikky/shuffle:shuffle-subflow_1.1.0",
}
//"testing:1.0.0": "frikky/shuffle:testing_1.0.0",
//fmt.Sprintf("%s_%s", workflowExecution.ExecutionId, action.ID)
// New Worker mappings
// visited, appendActions, nextActions, notFound, queueNodes, toRemove, executed, env
var portMappings map[string]int
var baseport = 33333
type UserInputSubflow struct {
Argument string `json:"execution_argument"`
ContinueUrl string `json:"continue_url"`
CancelUrl string `json:"cancel_url"`
}
// removes every container except itself (worker)
func shutdown(workflowExecution shuffle.WorkflowExecution, nodeId string, reason string, handleResultSend bool) {
log.Printf("[DEBUG][%s] Shutdown (%s) started with reason %#v. Result amount: %d. ResultsSent: %d, Send result: %#v, Parent: %#v", workflowExecution.ExecutionId, workflowExecution.Status, reason, len(workflowExecution.Results), requestsSent, handleResultSend, workflowExecution.ExecutionParent)
//reason := "Error in execution"
sleepDuration := 1
if handleResultSend && requestsSent < 2 {
shutdownData, err := json.Marshal(workflowExecution)
if err == nil {
sendResult(workflowExecution, shutdownData)
log.Printf("[WARNING][%s] Sent shutdown update with %d results and result value %s", workflowExecution.ExecutionId, len(workflowExecution.Results), reason)
} else {
log.Printf("[WARNING][%s] Failed to send update: %s", workflowExecution.ExecutionId, err)
}
time.Sleep(time.Duration(sleepDuration) * time.Second)
}
if len(reason) > 0 && len(nodeId) > 0 {
//log.Printf("[INFO] Running abort of workflow because it should be finished")
abortUrl := fmt.Sprintf("%s/api/v1/workflows/%s/executions/%s/abort", baseUrl, workflowExecution.Workflow.ID, workflowExecution.ExecutionId)
path := fmt.Sprintf("?reason=%s", url.QueryEscape(reason))
if len(nodeId) > 0 {
path += fmt.Sprintf("&node=%s", url.QueryEscape(nodeId))
}
if len(environment) > 0 {
path += fmt.Sprintf("&env=%s", url.QueryEscape(environment))
}
//fmt.Printf(url.QueryEscape(query))
abortUrl += path
log.Printf("[DEBUG][%s] Abort URL: %s", workflowExecution.ExecutionId, abortUrl)
req, err := http.NewRequest(
"GET",
abortUrl,
nil,
)
if err != nil {
log.Printf("[WARNING][%s] Failed building request: %s", workflowExecution.ExecutionId, err)
}
authorization := os.Getenv("AUTHORIZATION")
if len(authorization) > 0 {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", authorization))
} else {
log.Printf("[ERROR][%s] No authorization specified for abort", workflowExecution.ExecutionId)
}
req.Header.Add("Content-Type", "application/json")
client := shuffle.GetExternalClient(baseUrl)
//log.Printf("[DEBUG][%s] All App Logs: %#v", workflowExecution.ExecutionId, allLogs)
newresp, err := client.Do(req)
if err != nil {
log.Printf("[WARNING][%s] Failed abort request: %s", workflowExecution.ExecutionId, err)
} else {
defer newresp.Body.Close()
}
} else {
//log.Printf("[INFO][%s] NOT running abort during shutdown.", workflowExecution.ExecutionId)
}
log.Printf("[DEBUG][%s] Finished shutdown (after %d seconds). ", workflowExecution.ExecutionId, sleepDuration)
//Finished shutdown (after %d seconds). ", sleepDuration)
// Allows everything to finish in subprocesses (apps)
time.Sleep(time.Duration(sleepDuration) * time.Second)
os.Exit(3)
}
// }
func isRunningInCluster() bool {
_, existsHost := os.LookupEnv("KUBERNETES_SERVICE_HOST")
_, existsPort := os.LookupEnv("KUBERNETES_SERVICE_PORT")
return existsHost && existsPort
}
func buildEnvVars(envMap map[string]string) []corev1.EnvVar {
var envVars []corev1.EnvVar
for key, value := range envMap {
envVars = append(envVars, corev1.EnvVar{Name: key, Value: value})
}
return envVars
}
func getKubernetesClient() (*kubernetes.Clientset, error) {
if isRunningInCluster() {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return clientset, nil
} else {
home := homedir.HomeDir()
kubeconfigPath := filepath.Join(home, ".kube", "config")
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return clientset, nil
}
}
// Deploys the internal worker whenever something happens
func deployApp(cli *dockerclient.Client, image string, identifier string, env []string, workflowExecution shuffle.WorkflowExecution, action shuffle.Action) error {
// log.Printf("################################### new call to deployApp ###################################")
// log.Printf("image: %s", image)
// log.Printf("identifier: %s", identifier)
// log.Printf("execution: %+v", workflowExecution)
log.Printf("[DEBUG] Adding SHUFFLE_APP_SDK_TIMEOUT=%s", os.Getenv("SHUFFLE_APP_SDK_TIMEOUT"))
env = append(env, fmt.Sprintf("SHUFFLE_APP_SDK_TIMEOUT=%s", os.Getenv("SHUFFLE_APP_SDK_TIMEOUT")))
if os.Getenv("IS_KUBERNETES") == "true" {
namespace := "shuffle"
localRegistry := os.Getenv("REGISTRY_URL")
envMap := make(map[string]string)
for _, envStr := range env {
parts := strings.SplitN(envStr, "=", 2)
if len(parts) == 2 {
envMap[parts[0]] = parts[1]
}
}
clientset, err := getKubernetesClient()
if err != nil {
fmt.Println("[ERROR]Error getting kubernetes client:", err)
// os.Exit(1)
}
log.Printf("[DEBUG] Got kubernetes client")
str := strings.ToLower(identifier)
strSplit := strings.Split(str, "_")
value := strSplit[0]
value = strings.ReplaceAll(value, "_", "-")
// checking if app is generated or not
appDetails := strings.Split(image, ":")[1]
appDetailsSplit := strings.Split(appDetails, "_")
appName := strings.Join(appDetailsSplit[:len(appDetailsSplit)-1], "_")
appVersion := appDetailsSplit[len(appDetailsSplit)-1]
// log.Printf("APP VERSION IS: %s", appVersion)
for _, app := range workflowExecution.Workflow.Actions {
// log.Printf("[DEBUG] App: %s, Version: %s", appName, appVersion)
// log.Printf("[DEBUG] Checking app %s with version %s", app.AppName, app.AppVersion)
if app.AppName == appName && app.AppVersion == appVersion {
if app.Generated == true {
log.Printf("[DEBUG] Generated app, setting local registry")
image = fmt.Sprintf("%s/%s", localRegistry, image)
break
} else {
log.Printf("[DEBUG] Not generated app, setting shuffle registry")
}
}
}
//fix naming convention
podUuid := uuid.NewV4().String()
podName := fmt.Sprintf("%s-%s", value, podUuid)
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Labels: map[string]string{
"app": "shuffle-app",
"executionId": workflowExecution.ExecutionId,
},
},
Spec: corev1.PodSpec{
// NodeName: "worker1"
RestartPolicy: "Never",
Containers: []corev1.Container{
{
Name: value,
Image: image,
Env: buildEnvVars(envMap),
// ImagePullPolicy: corev1.PullAlways,
},
},
},
}
createdPod, err := clientset.CoreV1().Pods(namespace).Create(context.Background(), pod, metav1.CreateOptions{})
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating pod: %v\n", err)
// os.Exit(1)
}
fmt.Printf("[DEBUG] Created pod %q in namespace %q\n", createdPod.Name, createdPod.Namespace)
} else {
// form basic hostConfig
ctx := context.Background()
if action.AppName == "shuffle-subflow" {
// Automatic replacement of URL
for paramIndex, param := range action.Parameters {
if param.Name != "backend_url" {
continue
}
if strings.Contains(param.Value, "shuffle-backend") {
// Automatic replacement as this is default
action.Parameters[paramIndex].Value = os.Getenv("BASE_URL")
log.Printf("[DEBUG][%s] Replaced backend_url with %s", workflowExecution.ExecutionId, os.Getenv("BASE_URL"))
}
}
}
// Max 10% CPU every second
//CPUShares: 128,
//CPUQuota: 10000,
//CPUPeriod: 100000,
hostConfig := &container.HostConfig{
LogConfig: container.LogConfig{
Type: "json-file",
Config: map[string]string{
"max-size": "10m",
},
},
Resources: container.Resources{},
}
hostConfig.NetworkMode = container.NetworkMode(fmt.Sprintf("container:worker-%s", workflowExecution.ExecutionId))
// Removing because log extraction should happen first
if cleanupEnv == "true" {
hostConfig.AutoRemove = true
}
// FIXME: Add proper foldermounts here
//log.Printf("\n\nPRE FOLDERMOUNT\n\n")
//volumeBinds := []string{"/tmp/shuffle-mount:/rules"}
//volumeBinds := []string{"/tmp/shuffle-mount:/rules"}
volumeBinds := []string{}
if len(volumeBinds) > 0 {
log.Printf("[DEBUG] Setting up binds for container!")
hostConfig.Binds = volumeBinds
hostConfig.Mounts = []mount.Mount{}
for _, bind := range volumeBinds {
if !strings.Contains(bind, ":") || strings.Contains(bind, "..") || strings.HasPrefix(bind, "~") {
log.Printf("[WARNING] Bind %s is invalid.", bind)
continue
}
log.Printf("[DEBUG] Appending bind %s", bind)
bindSplit := strings.Split(bind, ":")
sourceFolder := bindSplit[0]
destinationFolder := bindSplit[0]
hostConfig.Mounts = append(hostConfig.Mounts, mount.Mount{
Type: mount.TypeBind,
Source: sourceFolder,
Target: destinationFolder,
})
}
} else {
//log.Printf("[WARNING] Not mounting folders")
}
config := &container.Config{
Image: image,
Env: env,
}
// Checking as late as possible, just in case.
newExecId := fmt.Sprintf("%s_%s", workflowExecution.ExecutionId, action.ID)
_, err := shuffle.GetCache(ctx, newExecId)
if err == nil {
log.Printf("\n\n[DEBUG] Result for %s already found - returning\n\n", newExecId)
return nil
}
cacheData := []byte("1")
err = shuffle.SetCache(ctx, newExecId, cacheData, 30)
if err != nil {
log.Printf("[WARNING] Failed setting cache for action %s: %s", newExecId, err)
} else {
log.Printf("[DEBUG] Adding %s to cache. Name: %s", newExecId, action.Name)
}
if action.ExecutionDelay > 0 {
log.Printf("[DEBUG] Running app %s in docker with delay of %d", action.Name, action.ExecutionDelay)
waitTime := time.Duration(action.ExecutionDelay) * time.Second
time.AfterFunc(waitTime, func() {
DeployContainer(ctx, cli, config, hostConfig, identifier, workflowExecution, newExecId)
})
} else {
log.Printf("[DEBUG] Running app %s in docker NORMALLY as there is no delay set with identifier %s", action.Name, identifier)
returnvalue := DeployContainer(ctx, cli, config, hostConfig, identifier, workflowExecution, newExecId)
log.Printf("[DEBUG] Normal deploy ret: %s", returnvalue)
return returnvalue
}
return nil
}
return nil
}
func cleanupExecution(clientset *kubernetes.Clientset, workflowExecution shuffle.WorkflowExecution, namespace string) error {
workerName := fmt.Sprintf("worker-%s", workflowExecution.ExecutionId)
labelSelector := fmt.Sprintf("app=shuffle-app,executionId=%s", workflowExecution.ExecutionId)
podList, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return fmt.Errorf("[ERROR]failed to list apps with label selector %s: %v", labelSelector, err)
}
for _, pod := range podList.Items {
err := clientset.CoreV1().Pods(namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete app %s: %v", pod.Name, err)
}
fmt.Printf("App %s in namespace %s deleted.\n", pod.Name, namespace)
}
podErr := clientset.CoreV1().Pods(namespace).Delete(context.TODO(), workerName, metav1.DeleteOptions{})
if podErr != nil {
return fmt.Errorf("[ERROR] failed to delete the worker %s in namespace %s: %v", workerName, namespace, podErr)
}
fmt.Printf("[DEBUG] %s in namespace %s deleted.\n", workerName, namespace)
return nil
}
func DeployContainer(ctx context.Context, cli *dockerclient.Client, config *container.Config, hostConfig *container.HostConfig, identifier string, workflowExecution shuffle.WorkflowExecution, newExecId string) error {
cont, err := cli.ContainerCreate(
ctx,
config,
hostConfig,
nil,
nil,
identifier,
)
if err != nil {
//log.Printf("[ERROR] Failed creating container: %s", err)
if !strings.Contains(err.Error(), "Conflict. The container name") {
log.Printf("[ERROR] Container CREATE error (1): %s", err)
cacheErr := shuffle.DeleteCache(ctx, newExecId)
if cacheErr != nil {
log.Printf("[ERROR] FAILED Deleting cache for %s: %s", newExecId, cacheErr)
}
return err
} else {
parsedUuid := uuid.NewV4()
identifier = fmt.Sprintf("%s-%s", identifier, parsedUuid)
//hostConfig.NetworkMode = container.NetworkMode(fmt.Sprintf("container:worker-%s", workflowExecution.ExecutionId))
log.Printf("[DEBUG] 2 - Identifier: %s", identifier)
cont, err = cli.ContainerCreate(
context.Background(),
config,
hostConfig,
nil,
nil,
identifier,
)
if err != nil {
log.Printf("[ERROR] Container create error (2): %s", err)
cacheErr := shuffle.DeleteCache(ctx, newExecId)
if cacheErr != nil {
log.Printf("[ERROR] FAILED Deleting cache for %s: %s", newExecId, cacheErr)
}
return err
}
//log.Printf("[DEBUG] Made new container ID
}
}
err = cli.ContainerStart(ctx, cont.ID, types.ContainerStartOptions{})
if err != nil {
if strings.Contains(fmt.Sprintf("%s", err), "cannot join network") || strings.Contains(fmt.Sprintf("%s", err), "No such container") {
parsedUuid := uuid.NewV4()
identifier = fmt.Sprintf("%s-%s-nonetwork", identifier, parsedUuid)
hostConfig = &container.HostConfig{
LogConfig: container.LogConfig{
Type: "json-file",
Config: map[string]string{
"max-size": "10m",
},
},
Resources: container.Resources{},
}
cont, err = cli.ContainerCreate(
context.Background(),
config,
hostConfig,
nil,
nil,
identifier,
)
if err != nil {
log.Printf("[ERROR] Container create error (3): %s", err)
cacheErr := shuffle.DeleteCache(ctx, newExecId)
if cacheErr != nil {
log.Printf("[ERROR] FAILED Deleting cache for %s: %s", newExecId, cacheErr)
}
return err
}
log.Printf("[DEBUG] Running secondary check without network with worker")
err = cli.ContainerStart(ctx, cont.ID, types.ContainerStartOptions{})
}
if err != nil {
log.Printf("[ERROR] Failed to start container in environment %s: %s", environment, err)
cacheErr := shuffle.DeleteCache(ctx, newExecId)
if cacheErr != nil {
log.Printf("[ERROR] FAILED Deleting cache for %s: %s", newExecId, cacheErr)
}
//shutdown(workflowExecution, workflowExecution.Workflow.ID, true)
return err
}
}
log.Printf("[DEBUG] Container %s was created for %s", cont.ID, identifier)
// Waiting to see if it exits.. Stupid, but stable(r)
if workflowExecution.ExecutionSource != "default" {
log.Printf("[INFO] Handling NON-default execution source %s - NOT waiting or validating!", workflowExecution.ExecutionSource)
} else if workflowExecution.ExecutionSource == "default" {
log.Printf("[INFO] Handling DEFAULT execution source %s - SKIPPING wait anyway due to exited issues!", workflowExecution.ExecutionSource)
}
log.Printf("[DEBUG] Deployed container ID %s", cont.ID)
//containerIds = append(containerIds, cont.ID)
return nil
}
func removeContainer(containername string) error {
ctx := context.Background()
cli, err := dockerclient.NewEnvClient()
if err != nil {
log.Printf("[DEBUG] Unable to create docker client: %s", err)
return err
}
// FIXME - ucnomment
// containers, err := cli.ContainerList(ctx, types.ContainerListOptions{
// All: true,
// })
_ = ctx
_ = cli
//if err := cli.ContainerStop(ctx, containername, nil); err != nil {
// log.Printf("Unable to stop container %s - running removal anyway, just in case: %s", containername, err)
//}
removeOptions := types.ContainerRemoveOptions{
RemoveVolumes: true,
Force: true,
}
// FIXME - remove comments etc
_ = removeOptions
//if err := cli.ContainerRemove(ctx, containername, removeOptions); err != nil {
// log.Printf("Unable to remove container: %s", err)
//}
return nil
}
func runFilter(workflowExecution shuffle.WorkflowExecution, action shuffle.Action) {
// 1. Get the parameter $.#.id
if action.Label == "filter_cases" && len(action.Parameters) > 0 {
if action.Parameters[0].Variant == "ACTION_RESULT" {
param := action.Parameters[0]
value := param.Value
_ = value
// Loop cases.. Hmm, that's tricky
}
} else {
log.Printf("No handler for filter %s with %d params", action.Label, len(action.Parameters))
}
}
func removeIndex(s []string, i int) []string {
s[len(s)-1], s[i] = s[i], s[len(s)-1]
return s[:len(s)-1]
}
func handleExecutionResult(workflowExecution shuffle.WorkflowExecution) {
ctx := context.Background()
//log.Printf("[DEBUG][%s] Pre DecideExecution", workflowExecution.ExecutionId)
workflowExecution, relevantActions := shuffle.DecideExecution(ctx, workflowExecution, environment)
startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
dockercli, err := dockerclient.NewEnvClient()
if err != nil {
log.Printf("[ERROR] Unable to create docker client (3): %s", err)
return
}
// log.Printf("\n\n[DEBUG] Got %d relevant action(s) to run!\n\n", len(relevantActions))
for _, action := range relevantActions {
appname := action.AppName
appversion := action.AppVersion
appname = strings.Replace(appname, ".", "-", -1)
appversion = strings.Replace(appversion, ".", "-", -1)
parsedAppname := strings.Replace(strings.ToLower(action.AppName), " ", "-", -1)
image := fmt.Sprintf("%s:%s_%s", baseimagename, parsedAppname, action.AppVersion)
if strings.Contains(image, " ") {
image = strings.ReplaceAll(image, " ", "-")
}
// Added UUID to identifier just in case
//identifier := fmt.Sprintf("%s_%s_%s_%s_%s", appname, appversion, action.ID, workflowExecution.ExecutionId, uuid.NewV4())
identifier := fmt.Sprintf("%s_%s_%s_%s", appname, appversion, action.ID, workflowExecution.ExecutionId)
if strings.Contains(identifier, " ") {
identifier = strings.ReplaceAll(identifier, " ", "-")
}
//if arrayContains(executed, action.ID) || arrayContains(visited, action.ID) {
// log.Printf("[WARNING] Action %s is already executed")
// continue
//}
//visited = append(visited, action.ID)
//executed = append(executed, action.ID)
// FIXME - check whether it's running locally yet too
stats, err := dockercli.ContainerInspect(context.Background(), identifier)
if err != nil || stats.ContainerJSONBase.State.Status != "running" {
// REMOVE
if err == nil {
log.Printf("[DEBUG][%s] Docker Container Status: %s, should kill: %s", workflowExecution.ExecutionId, stats.ContainerJSONBase.State.Status, identifier)
err = removeContainer(identifier)
if err != nil {
log.Printf("Error killing container: %s", err)
}
} else {
//log.Printf("WHAT TO DO HERE?: %s", err)
}
} else if stats.ContainerJSONBase.State.Status == "running" {
//log.Printf("
continue
}
if len(action.Parameters) == 0 {
action.Parameters = []shuffle.WorkflowAppActionParameter{}
}
if len(action.Errors) == 0 {
action.Errors = []string{}
}
// marshal action and put it in there rofl
log.Printf("[INFO][%s] Time to execute %s (%s) with app %s:%s, function %s, env %s with %d parameters.", workflowExecution.ExecutionId, action.ID, action.Label, action.AppName, action.AppVersion, action.Name, action.Environment, len(action.Parameters))
actionData, err := json.Marshal(action)
if err != nil {
log.Printf("[WARNING] Failed unmarshalling action: %s", err)
continue
}
if action.AppID == "0ca8887e-b4af-4e3e-887c-87e9d3bc3d3e" {
log.Printf("[DEBUG] Should run filter: %#v\n\n", action)
runFilter(workflowExecution, action)
continue
}
executionData, err := json.Marshal(workflowExecution)
if err != nil {
log.Printf("[ERROR] Failed marshalling executiondata: %s", err)
executionData = []byte("")
}
// Sending full execution so that it won't have to load in every app
// This might be an issue if they can read environments, but that's alright
// if everything is generated during execution
//log.Printf("[DEBUG][%s] Deployed with CALLBACK_URL %s and BASE_URL %s", workflowExecution.ExecutionId, appCallbackUrl, baseUrl)
env := []string{
fmt.Sprintf("EXECUTIONID=%s", workflowExecution.ExecutionId),
fmt.Sprintf("AUTHORIZATION=%s", workflowExecution.Authorization),
fmt.Sprintf("CALLBACK_URL=%s", baseUrl),
fmt.Sprintf("BASE_URL=%s", appCallbackUrl),
fmt.Sprintf("TZ=%s", timezone),
fmt.Sprintf("SHUFFLE_LOGS_DISABLED=%s", os.Getenv("SHUFFLE_LOGS_DISABLED")),
}
if len(actionData) >= 100000 {
log.Printf("[WARNING] Omitting some data from action execution. Length: %d. Fix in SDK!", len(actionData))
newParams := []shuffle.WorkflowAppActionParameter{}
for _, param := range action.Parameters {
paramData, err := json.Marshal(param)
if err != nil {
log.Printf("[WARNING] Failed to marshal param %s: %s", param.Name, err)
newParams = append(newParams, param)
continue
}
if len(paramData) >= 50000 {
log.Printf("[WARNING] Removing a lot of data from param %s with length %d", param.Name, len(paramData))
param.Value = "SHUFFLE_AUTO_REMOVED"
}
newParams = append(newParams, param)
}
action.Parameters = newParams
actionData, err = json.Marshal(action)
if err == nil {
log.Printf("[DEBUG] Ran data replace on action %s. new length: %d", action.Name, len(actionData))
} else {
log.Printf("[WARNING] Failed to marshal new actionData: %s", err)
}
} else {
//log.Printf("[DEBUG] Actiondata is NOT 100000 in length. Adding as normal.")
}
actionEnv := fmt.Sprintf("ACTION=%s", string(actionData))
env = append(env, actionEnv)
if strings.ToLower(os.Getenv("SHUFFLE_PASS_APP_PROXY")) == "true" {
//log.Printf("APPENDING PROXY TO THE APP!")
env = append(env, fmt.Sprintf("HTTP_PROXY=%s", os.Getenv("HTTP_PROXY")))
env = append(env, fmt.Sprintf("HTTPS_PROXY=%s", os.Getenv("HTTPS_PROXY")))
env = append(env, fmt.Sprintf("NO_PROXY=%s", os.Getenv("NO_PROXY")))
}
// Fixes issue:
// standard_go init_linux.go:185: exec user process caused "argument list too long"
// https://devblogs.microsoft.com/oldnewthing/20100203-00/?p=15083
// FIXME: Ensure to NEVER do this anymore
// This potentially breaks too much stuff. Better to have the app poll the data.
_ = executionData
/*
maxSize := 32700 - len(string(actionData)) - 2000
if len(executionData) < maxSize {
log.Printf("[INFO] ADDING FULL_EXECUTION because size is smaller than %d", maxSize)
env = append(env, fmt.Sprintf("FULL_EXECUTION=%s", string(executionData)))
} else {
log.Printf("[WARNING] Skipping FULL_EXECUTION because size is larger than %d", maxSize)
}
*/
// Uses a few ways of getting / checking if an app is available
// 1. Try original with lowercase
// 2. Go to original (no spaces)
// 3. Add remote repo location
images := []string{
image,
fmt.Sprintf("%s:%s_%s", baseimagename, parsedAppname, action.AppVersion),
fmt.Sprintf("%s/%s:%s_%s", registryName, baseimagename, parsedAppname, action.AppVersion),
}
// If cleanup is set, it should run for efficiency
pullOptions := types.ImagePullOptions{}
if cleanupEnv == "true" {
err = deployApp(dockercli, images[0], identifier, env, workflowExecution, action)
if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
if strings.Contains(err.Error(), "exited prematurely") {
log.Printf("[DEBUG] Shutting down (2)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
}
err := downloadDockerImageBackend(&http.Client{Timeout: 60 * time.Second}, image)
executed := false
if err == nil {
log.Printf("[DEBUG] Downloaded image %s from backend (CLEANUP)", image)
//err = deployApp(dockercli, image, identifier, env, workflow, action)
err = deployApp(dockercli, image, identifier, env, workflowExecution, action)
if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
if strings.Contains(err.Error(), "exited prematurely") {
log.Printf("[DEBUG] Shutting down (41)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
}
} else {
executed = true
}
}
if !executed {
image = images[2]
err = deployApp(dockercli, image, identifier, env, workflowExecution, action)
if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
if strings.Contains(err.Error(), "exited prematurely") {
log.Printf("[DEBUG] Shutting down (3)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
}
//log.Printf("[WARNING] Failed CLEANUP execution. Downloading image %s remotely.", image)
log.Printf("[WARNING] Failed to download image %s (CLEANUP): %s", image, err)
reader, err := dockercli.ImagePull(context.Background(), image, pullOptions)
if err != nil {
log.Printf("[ERROR] Failed getting %s. Couldn't be find locally, AND is missing.", image)
log.Printf("[DEBUG] Shutting down (4)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
} else {
baseTag := strings.Split(image, ":")
if len(baseTag) > 1 {
tag := baseTag[1]
log.Printf("[DEBUG] Creating tag copies of registry downloaded containers from tag %s", tag)
// Remapping
ctx := context.Background()
dockercli.ImageTag(ctx, image, fmt.Sprintf("frikky/shuffle:%s", tag))
dockercli.ImageTag(ctx, image, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag))
}
}
buildBuf := new(strings.Builder)
_, err = io.Copy(buildBuf, reader)
if err != nil && !strings.Contains(fmt.Sprintf("%s", err.Error()), "Conflict. The container name") {
log.Printf("[ERROR] Error in IO copy: %s", err)
log.Printf("[DEBUG] Shutting down (5)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
} else {
if strings.Contains(buildBuf.String(), "errorDetail") {
log.Printf("[ERROR] Docker build:\n%s\nERROR ABOVE: Trying to pull tags from: %s", buildBuf.String(), image)
log.Printf("[DEBUG] Shutting down (6)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
}
log.Printf("[INFO] Successfully downloaded %s", image)
}
err = deployApp(dockercli, image, identifier, env, workflowExecution, action)
if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
log.Printf("[ERROR] Failed deploying image for the FOURTH time. Aborting if the image doesn't exist")
if strings.Contains(err.Error(), "exited prematurely") {
log.Printf("[DEBUG] Shutting down (7)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
}
if strings.Contains(err.Error(), "No such image") {
//log.Printf("[WARNING] Failed deploying %s from image %s: %s", identifier, image, err)
log.Printf("[ERROR] Image doesn't exist. Shutting down")
log.Printf("[DEBUG] Shutting down (8)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
}
}
}
}
}
} else {
err = deployApp(dockercli, images[0], identifier, env, workflowExecution, action)
if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
log.Printf("[DEBUG] Failed deploying app? %s", err)
if strings.Contains(err.Error(), "exited prematurely") {
log.Printf("[DEBUG] Shutting down (9)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
}
// Trying to replace with lowercase to deploy again. This seems to work with Dockerhub well.
// FIXME: Should try to remotely download directly if this persists.
image = images[1]
err = deployApp(dockercli, image, identifier, env, workflowExecution, action)
if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
if strings.Contains(err.Error(), "exited prematurely") {
log.Printf("[DEBUG] Shutting down (10)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
}
log.Printf("[DEBUG][%s] Failed deploy. Downloading image %s: %s", workflowExecution.ExecutionId, image, err)
err := downloadDockerImageBackend(&http.Client{Timeout: 60 * time.Second}, image)
executed := false
if err == nil {
log.Printf("[DEBUG] Downloaded image %s from backend (CLEANUP)", image)
//err = deployApp(dockercli, image, identifier, env, workflow, action)
err = deployApp(dockercli, image, identifier, env, workflowExecution, action)
if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
if strings.Contains(err.Error(), "exited prematurely") {
log.Printf("[DEBUG] Shutting down (40)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
}
} else {
executed = true
}
}
if !executed {
image = images[2]
err = deployApp(dockercli, image, identifier, env, workflowExecution, action)
if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
if strings.Contains(err.Error(), "exited prematurely") {
log.Printf("[DEBUG] Shutting down (11)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
}
log.Printf("[WARNING] Failed deploying image THREE TIMES. Attempting to download %s as last resort from backend and dockerhub: %s", image, err)
reader, err := dockercli.ImagePull(context.Background(), image, pullOptions)
if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
log.Printf("[ERROR] Failed getting %s. The couldn't be find locally, AND is missing.", image)
log.Printf("[DEBUG] Shutting down (12)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
} else {
baseTag := strings.Split(image, ":")
if len(baseTag) > 1 {
tag := baseTag[1]
log.Printf("[DEBUG] Creating tag copies of registry downloaded containers from tag %s", tag)
// Remapping
ctx := context.Background()
dockercli.ImageTag(ctx, image, fmt.Sprintf("frikky/shuffle:%s", tag))
dockercli.ImageTag(ctx, image, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag))
}
}
buildBuf := new(strings.Builder)
_, err = io.Copy(buildBuf, reader)
if err != nil {
log.Printf("[ERROR] Error in IO copy: %s", err)
log.Printf("[DEBUG] Shutting down (13)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
} else {
if strings.Contains(buildBuf.String(), "errorDetail") {
log.Printf("[ERROR] Docker build:\n%s\nERROR ABOVE: Trying to pull tags from: %s", buildBuf.String(), image)
log.Printf("[DEBUG] Shutting down (14)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("Error deploying container: %s", buildBuf.String()), true)
return
}
log.Printf("[INFO] Successfully downloaded %s", image)
}
}
err = deployApp(dockercli, image, identifier, env, workflowExecution, action)
if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") {
log.Printf("[ERROR] Failed deploying image for the FOURTH time. Aborting if the image doesn't exist")
if strings.Contains(err.Error(), "exited prematurely") {
log.Printf("[DEBUG] Shutting down (15)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
}
if strings.Contains(err.Error(), "No such image") {
//log.Printf("[WARNING] Failed deploying %s from image %s: %s", identifier, image, err)
log.Printf("[ERROR] Image doesn't exist. Shutting down")
log.Printf("[DEBUG] Shutting down (16)")
shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true)
return
}
}
}
}
}
}
//log.Printf("[INFO][%s] Adding visited (3): %s (%s). Actions: %d, Results: %d", workflowExecution.ExecutionId, action.Label, action.ID, len(workflowExecution.Workflow.Actions), len(workflowExecution.Results))
visited = append(visited, action.ID)
executed = append(executed, action.ID)
// If children of action.ID are NOT in executed:
// Remove them from visited.
//log.Printf("EXECUTED: %#v", executed)
}
//log.Printf(nextAction)
//log.Printf(startAction, children[startAction])
// FIXME - new request here
// FIXME - clean up stopped (remove) containers with this execution id
err = shuffle.UpdateExecutionVariables(ctx, workflowExecution.ExecutionId, startAction, children, parents, visited, executed, nextActions, environments, extra)
if err != nil {
log.Printf("\n\n[ERROR] Failed to update exec variables for execution %s: %s (2)\n\n", workflowExecution.ExecutionId, err)
}
if len(workflowExecution.Results) == len(workflowExecution.Workflow.Actions)+extra {
shutdownCheck := true
for _, result := range workflowExecution.Results {
if result.Status == "EXECUTING" || result.Status == "WAITING" {
// Cleaning up executing stuff
shutdownCheck = false
// USED TO BE CONTAINER REMOVAL
// FIXME - send POST request to kill the container
//log.Printf("Should remove (POST request) stopped containers")
//ret = requests.post("%s%s" % (self.url, stream_path), headers=headers, json=action_result)
}
}
if shutdownCheck {
log.Printf("[INFO][%s] BREAKING BECAUSE RESULTS IS SAME LENGTH AS ACTIONS. SHOULD CHECK ALL RESULTS FOR WHETHER THEY'RE DONE", workflowExecution.ExecutionId)
validateFinished(workflowExecution)
log.Printf("[DEBUG][%s] Shutting down (17)", workflowExecution.ExecutionId)
if os.Getenv("IS_KUBERNETES") == "true" {
// log.Printf("workflow execution: %#v", workflowExecution)
clientset, err := getKubernetesClient()
if err != nil {
fmt.Println("[ERROR]Error getting kubernetes client:", err)
os.Exit(1)
}
cleanupExecution(clientset, workflowExecution, "shuffle")
} else {
shutdown(workflowExecution, "", "", true)
}
return
}
}
time.Sleep(time.Duration(sleepTime) * time.Second)
return
}
func executionInit(workflowExecution shuffle.WorkflowExecution) error {
parents := map[string][]string{}
children := map[string][]string{}
nextActions := []string{}
extra := 0
startAction := workflowExecution.Start
//log.Printf("[INFO][%s] STARTACTION: %s", workflowExecution.ExecutionId, startAction)
if len(startAction) == 0 {
log.Printf("[INFO][%s] Didn't find execution start action. Setting it to workflow start action.", workflowExecution.ExecutionId)
startAction = workflowExecution.Workflow.Start
}
// Setting up extra counter
for _, trigger := range workflowExecution.Workflow.Triggers {
//log.Printf("[DEBUG] Appname trigger (0): %s", trigger.AppName)
if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" {
extra += 1
}
}
nextActions = append(nextActions, startAction)
for _, branch := range workflowExecution.Workflow.Branches {
// Check what the parent is first. If it's trigger - skip
sourceFound := false
destinationFound := false
for _, action := range workflowExecution.Workflow.Actions {
if action.ID == branch.SourceID {
sourceFound = true
}
if action.ID == branch.DestinationID {
destinationFound = true
}
}
for _, trigger := range workflowExecution.Workflow.Triggers {
//log.Printf("Appname trigger (0): %s", trigger.AppName)
if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" {
if trigger.ID == branch.SourceID {
sourceFound = true
} else if trigger.ID == branch.DestinationID {
destinationFound = true
}
}
}
if sourceFound {
parents[branch.DestinationID] = append(parents[branch.DestinationID], branch.SourceID)
} else {
log.Printf("[DEBUG] ID %s was not found in actions! Skipping parent. (TRIGGER?)", branch.SourceID)
}
if destinationFound {
children[branch.SourceID] = append(children[branch.SourceID], branch.DestinationID)
} else {
log.Printf("[DEBUG] ID %s was not found in actions! Skipping child. (TRIGGER?)", branch.SourceID)
}
}
/*
log.Printf("\n\n\n[INFO] CHILDREN FOUND: %#v", children)
log.Printf("[INFO] PARENTS FOUND: %#v", parents)
log.Printf("[INFO] NEXT ACTIONS: %#v\n\n", nextActions)
*/
log.Printf("[INFO][%s] shuffle.Actions: %d + Special shuffle.Triggers: %d", workflowExecution.ExecutionId, len(workflowExecution.Workflow.Actions), extra)
onpremApps := []string{}
toExecuteOnprem := []string{}
for _, action := range workflowExecution.Workflow.Actions {
if strings.ToLower(action.Environment) != strings.ToLower(environment) {
continue
}
toExecuteOnprem = append(toExecuteOnprem, action.ID)
actionName := fmt.Sprintf("%s:%s_%s", baseimagename, action.AppName, action.AppVersion)
found := false
for _, app := range onpremApps {
if actionName == app {
found = true
}
}
if !found {
onpremApps = append(onpremApps, actionName)
}
}
if len(onpremApps) == 0 {
return errors.New(fmt.Sprintf("No apps to handle onprem (%s)", environment))
}
pullOptions := types.ImagePullOptions{}
_ = pullOptions
for _, image := range onpremApps {
//log.Printf("[INFO] Image: %s", image)
// Kind of gambling that the image exists.
if strings.Contains(image, " ") {
image = strings.ReplaceAll(image, " ", "-")
}
// FIXME: Reimplement for speed later
// Skip to make it faster
//reader, err := dockercli.ImagePull(context.Background(), image, pullOptions)
//if err != nil {
// log.Printf("Failed getting %s. The app is missing or some other issue", image)
// shutdown(workflowExecution)
//}
////io.Copy(os.Stdout, reader)
//_ = reader
//log.Printf("Successfully downloaded and built %s", image)
}
ctx := context.Background()
visited := []string{}
executed := []string{}
environments := []string{}
for _, action := range workflowExecution.Workflow.Actions {
found := false
for _, environment := range environments {
if action.Environment == environment {
found = true
break
}
}
if !found {
environments = append(environments, action.Environment)
}
}
//var visited []string
//var executed []string
err := shuffle.UpdateExecutionVariables(ctx, workflowExecution.ExecutionId, startAction, children, parents, visited, executed, nextActions, environments, extra)
if err != nil {
log.Printf("\n\n[ERROR] Failed to update exec variables for execution %s: %s\n\n", workflowExecution.ExecutionId, err)
}
return nil
}
func handleDefaultExecution(client *http.Client, req *http.Request, workflowExecution shuffle.WorkflowExecution) error {
// if no onprem runs (shouldn't happen, but extra check), exit
// if there are some, load the images ASAP for the app
ctx := context.Background()
//startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
startAction, extra, _, _, _, _, _, _ := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
err := executionInit(workflowExecution)
if err != nil {
log.Printf("[INFO] Workflow setup failed for %s: %s", workflowExecution.ExecutionId, err)
log.Printf("[DEBUG] Shutting down (18)")
shutdown(workflowExecution, "", "", true)
}
log.Printf("[DEBUG] DEFAULT EXECUTION Startaction: %s", startAction)
setWorkflowExecution(ctx, workflowExecution, false)
streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", baseUrl)
for {
//fullUrl := fmt.Sprintf("%s/api/v1/workflows/%s/executions/%s/abort", baseUrl, workflowExecution.Workflow.ID, workflowExecution.ExecutionId)
//log.Printf("[INFO] URL: %s", fullUrl)
req, err := http.NewRequest(
"POST",
streamResultUrl,
bytes.NewBuffer([]byte(data)),
)
newresp, err := topClient.Do(req)
if err != nil {
log.Printf("[ERROR] Failed making request (1): %s", err)
time.Sleep(time.Duration(sleepTime) * time.Second)
continue
}
defer newresp.Body.Close()
body, err := ioutil.ReadAll(newresp.Body)
if err != nil {
log.Printf("[ERROR] Failed reading body (1): %s", err)
time.Sleep(time.Duration(sleepTime) * time.Second)
continue
}
if newresp.StatusCode != 200 {
log.Printf("[ERROR] Bad statuscode: %d, %s", newresp.StatusCode, string(body))
if strings.Contains(string(body), "Workflowexecution is already finished") {
log.Printf("[DEBUG] Shutting down (19)")
shutdown(workflowExecution, "", "", true)
}
time.Sleep(time.Duration(sleepTime) * time.Second)
continue
}
err = json.Unmarshal(body, &workflowExecution)
if err != nil {
log.Printf("[ERROR] Failed workflowExecution unmarshal: %s", err)
time.Sleep(time.Duration(sleepTime) * time.Second)
continue
}
if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "SUCCESS" {
log.Printf("[INFO][%s] Workflow execution is finished. Exiting worker.", workflowExecution.ExecutionId)
log.Printf("[DEBUG] Shutting down (20)")
//handle workerssssssssss
if os.Getenv("IS_KUBERNETES") == "true" {
// log.Printf("workflow execution: %#v", workflowExecution)
clientset, err := getKubernetesClient()
if err != nil {
fmt.Println("[ERROR]Error getting kubernetes client:", err)
os.Exit(1)
}
cleanupExecution(clientset, workflowExecution, "shuffle")
} else {
shutdown(workflowExecution, "", "", true)
}
}
log.Printf("[INFO][%s] Status: %s, Results: %d, actions: %d", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results), len(workflowExecution.Workflow.Actions)+extra)
if workflowExecution.Status != "EXECUTING" {
log.Printf("[WARNING][%s] Exiting as worker execution has status %s!", workflowExecution.ExecutionId, workflowExecution.Status)
log.Printf("[DEBUG] Shutting down (21)")
if os.Getenv("IS_KUBERNETES") == "true" {
// log.Printf("workflow execution: %#v", workflowExecution)
clientset, err := getKubernetesClient()
if err != nil {
fmt.Println("[ERROR]Error getting kubernetes client:", err)
os.Exit(1)
}
cleanupExecution(clientset, workflowExecution, "shuffle")
} else {
shutdown(workflowExecution, "", "", true)
}
}
setWorkflowExecution(ctx, workflowExecution, false)
//handleExecutionResult(workflowExecution)
}
return nil
}
func arrayContains(visited []string, id string) bool {
found := false
for _, item := range visited {
if item == id {
found = true
break
}
}
return found
}
func getResult(workflowExecution shuffle.WorkflowExecution, id string) shuffle.ActionResult {
for _, actionResult := range workflowExecution.Results {
if actionResult.Action.ID == id {
return actionResult
}
}
return shuffle.ActionResult{}
}
func getAction(workflowExecution shuffle.WorkflowExecution, id, environment string) shuffle.Action {
for _, action := range workflowExecution.Workflow.Actions {
if action.ID == id {
return action
}
}
for _, trigger := range workflowExecution.Workflow.Triggers {
if trigger.ID == id {
return shuffle.Action{
ID: trigger.ID,
AppName: trigger.AppName,
Name: trigger.AppName,
Environment: environment,
Label: trigger.Label,
}
log.Printf("FOUND TRIGGER: %#v!", trigger)
}
}
return shuffle.Action{}
}
func runSkipAction(client *http.Client, action shuffle.Action, workflowId, workflowExecutionId, authorization string, configuration string) error {
timeNow := time.Now().Unix()
result := shuffle.ActionResult{
Action: action,
ExecutionId: workflowExecutionId,
Authorization: authorization,
Result: configuration,
StartedAt: timeNow,
CompletedAt: 0,
Status: "SUCCESS",
}
resultData, err := json.Marshal(result)
if err != nil {
return err
}
streamUrl := fmt.Sprintf("%s/api/v1/streams", baseUrl)
req, err := http.NewRequest(
"POST",
streamUrl,
bytes.NewBuffer([]byte(resultData)),
)
if err != nil {
log.Printf("[WARNING] Error building skip request (0): %s", err)
return err
}
newresp, err := client.Do(req)
if err != nil {
log.Printf("[WARNING] Error running skip request (0): %s", err)
return err
}
defer newresp.Body.Close()
body, err := ioutil.ReadAll(newresp.Body)
if err != nil {
log.Printf("[WARNING] Failed reading body when skipping (0): %s", err)
return err
}
log.Printf("[INFO] Skip Action Body: %s", string(body))
return nil
}
// Sends request back to backend to handle the node
func runUserInput(client *http.Client, action shuffle.Action, workflowId string, workflowExecution shuffle.WorkflowExecution, authorization string, configuration string, dockercli *dockerclient.Client) error {
timeNow := time.Now().Unix()
result := shuffle.ActionResult{
Action: action,
ExecutionId: workflowExecution.ExecutionId,
Authorization: authorization,
Result: configuration,
StartedAt: timeNow,
CompletedAt: 0,
Status: "WAITING",
}
// Checking for userinput to deploy subflow for it
subflow := false
subflowId := ""
argument := ""
continueUrl := "testing continue"
cancelUrl := "testing cancel"
for _, item := range action.Parameters {
if item.Name == "subflow" {
subflow = true
subflowId = item.Value
} else if item.Name == "alertinfo" {
argument = item.Value
}
}
if subflow {
log.Printf("[DEBUG] Should run action with subflow app with argument %#v", argument)
newAction := shuffle.Action{
AppName: "shuffle-subflow",
Name: "run_subflow",
AppVersion: "1.0.0",
Label: "User Input Subflow Execution",
}
identifier := fmt.Sprintf("%s_%s_%s_%s", newAction.AppName, newAction.AppVersion, action.ID, workflowExecution.ExecutionId)
if strings.Contains(identifier, " ") {
identifier = strings.ReplaceAll(identifier, " ", "-")
}
inputValue := UserInputSubflow{
Argument: argument,
ContinueUrl: continueUrl,
CancelUrl: cancelUrl,
}
parsedArgument, err := json.Marshal(inputValue)
if err != nil {
log.Printf("[ERROR] Failed to parse arguments: %s", err)
parsedArgument = []byte(argument)
}
newAction.Parameters = []shuffle.WorkflowAppActionParameter{
shuffle.WorkflowAppActionParameter{
Name: "user_apikey",
Value: workflowExecution.Authorization,
},
shuffle.WorkflowAppActionParameter{
Name: "workflow",
Value: subflowId,
},
shuffle.WorkflowAppActionParameter{
Name: "argument",
Value: string(parsedArgument),
},
}
newAction.Parameters = append(newAction.Parameters, shuffle.WorkflowAppActionParameter{
Name: "source_workflow",
Value: workflowExecution.Workflow.ID,
})
newAction.Parameters = append(newAction.Parameters, shuffle.WorkflowAppActionParameter{
Name: "source_execution",
Value: workflowExecution.ExecutionId,
})
newAction.Parameters = append(newAction.Parameters, shuffle.WorkflowAppActionParameter{
Name: "source_node",
Value: action.ID,
})
newAction.Parameters = append(newAction.Parameters, shuffle.WorkflowAppActionParameter{
Name: "source_auth",
Value: workflowExecution.Authorization,
})
newAction.Parameters = append(newAction.Parameters, shuffle.WorkflowAppActionParameter{
Name: "startnode",
Value: "",
})
// If cleanup is set, it should run for efficiency
//appName := strings.Replace(identifier, fmt.Sprintf("_%s", action.ID), "", -1)
//appName = strings.Replace(appName, fmt.Sprintf("_%s", workflowExecution.ExecutionId), "", -1)
actionData, err := json.Marshal(newAction)
if err != nil {
return err
}
env := []string{
fmt.Sprintf("ACTION=%s", string(actionData)),
fmt.Sprintf("EXECUTIONID=%s", workflowExecution.ExecutionId),
fmt.Sprintf("AUTHORIZATION=%s", workflowExecution.Authorization),
fmt.Sprintf("CALLBACK_URL=%s", baseUrl),
fmt.Sprintf("BASE_URL=%s", appCallbackUrl),
fmt.Sprintf("TZ=%s", timezone),
fmt.Sprintf("SHUFFLE_LOGS_DISABLED=%s", os.Getenv("SHUFFLE_LOGS_DISABLED")),
}
if strings.ToLower(os.Getenv("SHUFFLE_PASS_APP_PROXY")) == "true" {
//log.Printf("APPENDING PROXY TO THE APP!")
env = append(env, fmt.Sprintf("HTTP_PROXY=%s", os.Getenv("HTTP_PROXY")))
env = append(env, fmt.Sprintf("HTTPS_PROXY=%s", os.Getenv("HTTPS_PROXY")))
env = append(env, fmt.Sprintf("NO_PROXY=%s", os.Getenv("NO_PROXY")))
}
err = deployApp(dockercli, "frikky/shuffle:shuffle-subflow_1.0.0", identifier, env, workflowExecution, newAction)
if err != nil {
log.Printf("[ERROR] Failed to deploy subflow for user input trigger %s: %s", action.ID, err)
}
} else {
log.Printf("[DEBUG] Running user input WITHOUT subflow")
}
resultData, err := json.Marshal(result)
if err != nil {
return err
}
streamUrl := fmt.Sprintf("%s/api/v1/streams", baseUrl)
req, err := http.NewRequest(
"POST",
streamUrl,
bytes.NewBuffer([]byte(resultData)),
)
if err != nil {
log.Printf("[WARNING] Error building test request (2): %s", err)
return err
}
newresp, err := client.Do(req)
if err != nil {
log.Printf("[WARNING] Error running test request (2): %s", err)
return err
}
defer newresp.Body.Close()
body, err := ioutil.ReadAll(newresp.Body)
if err != nil {
log.Printf("Failed reading body when waiting: %s", err)
return err
}
log.Printf("[INFO] User Input Body: %s", string(body))
return nil
}
func runTestExecution(client *http.Client, workflowId, apikey string) (string, string) {
executeUrl := fmt.Sprintf("%s/api/v1/workflows/%s/execute", baseUrl, workflowId)
req, err := http.NewRequest(
"GET",
executeUrl,
nil,
)
if err != nil {
log.Printf("Error building test request: %s", err)
return "", ""
}
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", apikey))
newresp, err := client.Do(req)
if err != nil {
log.Printf("[WARNING] Error running test request (3): %s", err)
return "", ""
}
defer newresp.Body.Close()
body, err := ioutil.ReadAll(newresp.Body)
if err != nil {
log.Printf("[WARNING] Failed reading body: %s", err)
return "", ""
}
log.Printf("[INFO] Test Body: %s", string(body))
var workflowExecution shuffle.WorkflowExecution
err = json.Unmarshal(body, &workflowExecution)
if err != nil {
log.Printf("Failed workflowExecution unmarshal: %s", err)
return "", ""
}
return workflowExecution.Authorization, workflowExecution.ExecutionId
}
func handleWorkflowQueue(resp http.ResponseWriter, request *http.Request) {
if request.Body == nil {
resp.WriteHeader(http.StatusBadRequest)
return
}
body, err := ioutil.ReadAll(request.Body)
if err != nil {
log.Printf("[WARNING] (3) Failed reading body for workflowqueue")
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
return
}
defer request.Body.Close()
var actionResult shuffle.ActionResult
err = json.Unmarshal(body, &actionResult)
if err != nil {
log.Printf("[ERROR] Failed shuffle.ActionResult unmarshaling (2): %s", err)
//resp.WriteHeader(401)
//resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
//return
}
if len(actionResult.ExecutionId) == 0 {
log.Printf("[WARNING] No workflow execution id in action result. Data: %s", string(body))
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "No workflow execution id in action result"}`)))
return
}
// 1. Get the shuffle.WorkflowExecution(ExecutionId) from the database
// 2. if shuffle.ActionResult.Authentication != shuffle.WorkflowExecution.Authentication -> exit
// 3. Add to and update actionResult in workflowExecution
// 4. Push to db
// IF FAIL: Set executionstatus: abort or cancel
ctx := context.Background()
workflowExecution, err := shuffle.GetWorkflowExecution(ctx, actionResult.ExecutionId)
if err != nil {
log.Printf("[ERROR][%s] Failed getting execution (workflowqueue) %s: %s", actionResult.ExecutionId, actionResult.ExecutionId, err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution ID %s because it doesn't exist locally."}`, actionResult.ExecutionId)))
return
}
if workflowExecution.Authorization != actionResult.Authorization {
log.Printf("[INFO] Bad authorization key when updating node (workflowQueue) %s. Want: %s, Have: %s", actionResult.ExecutionId, workflowExecution.Authorization, actionResult.Authorization)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key"}`)))
return
}
if workflowExecution.Status == "FINISHED" {
log.Printf("[DEBUG] Workflowexecution is already FINISHED. No further action can be taken")
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Workflowexecution is already finished because it has status %s. Lastnode: %s"}`, workflowExecution.Status, workflowExecution.LastNode)))
return
}
if workflowExecution.Status == "ABORTED" || workflowExecution.Status == "FAILURE" {
if workflowExecution.Workflow.Configuration.ExitOnError {
log.Printf("[WARNING] Workflowexecution already has status %s. No further action can be taken", workflowExecution.Status)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Workflowexecution is aborted because of %s with result %s and status %s"}`, workflowExecution.LastNode, workflowExecution.Result, workflowExecution.Status)))
return
} else {
log.Printf("Continuing even though it's aborted.")
}
}
log.Printf("[INFO][%s] Got result '%s' from '%s' with app '%s':'%s'", actionResult.ExecutionId, actionResult.Status, actionResult.Action.Label, actionResult.Action.AppName, actionResult.Action.AppVersion)
//results = append(results, actionResult)
//log.Printf("[INFO][%s] Time to execute %s (%s) with app %s:%s, function %s, env %s with %d parameters.", workflowExecution.ExecutionId, action.ID, action.Label, action.AppName, action.AppVersion, action.Name, action.Environment, len(action.Parameters))
//log.Printf("[DEBUG][%s] In workflowQueue with transaction", workflowExecution.ExecutionId)
runWorkflowExecutionTransaction(ctx, 0, workflowExecution.ExecutionId, actionResult, resp)
}
// Will make sure transactions are always ran for an execution. This is recursive if it fails. Allowed to fail up to 5 times
func runWorkflowExecutionTransaction(ctx context.Context, attempts int64, workflowExecutionId string, actionResult shuffle.ActionResult, resp http.ResponseWriter) {
//log.Printf("[DEBUG][%s] IN WORKFLOWEXECUTION SUB!", actionResult.ExecutionId)
workflowExecution, err := shuffle.GetWorkflowExecution(ctx, workflowExecutionId)
if err != nil {
log.Printf("[ERROR] Failed getting execution cache: %s", err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution"}`)))
return
}
resultLength := len(workflowExecution.Results)
setExecution := true
workflowExecution, dbSave, err := shuffle.ParsedExecutionResult(ctx, *workflowExecution, actionResult, true, 0)
if err != nil {
log.Printf("[DEBUG] Rerunning transaction? %s", err)
if strings.Contains(fmt.Sprintf("%s", err), "Rerun this transaction") {
workflowExecution, err := shuffle.GetWorkflowExecution(ctx, workflowExecutionId)
if err != nil {
log.Printf("[ERROR] Failed getting execution cache (2): %s", err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution (2)"}`)))
return
}
resultLength = len(workflowExecution.Results)
setExecution = true
workflowExecution, dbSave, err = shuffle.ParsedExecutionResult(ctx, *workflowExecution, actionResult, false, 0)
if err != nil {
log.Printf("[ERROR] Failed execution of parsedexecution (2): %s", err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution (2)"}`)))
return
} else {
log.Printf("[DEBUG] Successfully got ParsedExecution with %d results!", len(workflowExecution.Results))
}
} else {
log.Printf("[ERROR] Failed execution of parsedexecution: %s", err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution"}`)))
return
}
}
//log.Printf(`[DEBUG][%s] Got result %s from %s. Execution status: %s. Save: %#v. Parent: %#v`, actionResult.ExecutionId, actionResult.Status, actionResult.Action.ID, workflowExecution.Status, dbSave, workflowExecution.ExecutionParent)
//dbSave := false
//if len(results) != len(workflowExecution.Results) {
// log.Printf("[DEBUG][%s] There may have been an issue in transaction queue. Result lengths: %d vs %d. Should check which exists the base results, but not in entire execution, then append.", workflowExecution.ExecutionId, len(results), len(workflowExecution.Results))
//}
// Validating that action results hasn't changed
// Handled using cachhing, so actually pretty fast
cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId)
cache, err := shuffle.GetCache(ctx, cacheKey)
if err == nil {
//parsedValue := value.(*shuffle.WorkflowExecution)
parsedValue := &shuffle.WorkflowExecution{}
cacheData := []byte(cache.([]uint8))
err = json.Unmarshal(cacheData, &workflowExecution)
if err != nil {
log.Printf("[ERROR] Failed unmarshalling workflowexecution: %s", err)
}
if len(parsedValue.Results) > 0 && len(parsedValue.Results) != resultLength {
setExecution = false
if attempts > 5 {
//log.Printf("\n\nSkipping execution input - %d vs %d. Attempts: (%d)\n\n", len(parsedValue.Results), resultLength, attempts)
}
attempts += 1
if len(workflowExecution.Results) <= len(workflowExecution.Workflow.Actions) {
runWorkflowExecutionTransaction(ctx, attempts, workflowExecutionId, actionResult, resp)
return
}
}
}
/*
if value, found := requestCache.Get(cacheKey); found {
parsedValue := value.(*shuffle.WorkflowExecution)
if len(parsedValue.Results) > 0 && len(parsedValue.Results) != resultLength {
setExecution = false
if attempts > 5 {
//log.Printf("\n\nSkipping execution input - %d vs %d. Attempts: (%d)\n\n", len(parsedValue.Results), resultLength, attempts)
}
attempts += 1
if len(workflowExecution.Results) <= len(workflowExecution.Workflow.Actions) {
runWorkflowExecutionTransaction(ctx, attempts, workflowExecutionId, actionResult, resp)
return
}
}
}
*/
if setExecution || workflowExecution.Status == "FINISHED" || workflowExecution.Status == "ABORTED" || workflowExecution.Status == "FAILURE" {
log.Printf("[DEBUG][%s] Running setexec with status %s and %d results", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results))
err = setWorkflowExecution(ctx, *workflowExecution, dbSave)
if err != nil {
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed setting workflowexecution actionresult: %s"}`, err)))
return
}
} else {
log.Printf("[INFO][%s] Skipping setexec with status %s", workflowExecution.ExecutionId, workflowExecution.Status)
// Just in case. Should MAYBE validate finishing another time as well.
// This fixes issues with e.g. shuffle.Action -> shuffle.Trigger -> shuffle.Action.
handleExecutionResult(*workflowExecution)
//validateFinished(workflowExecution)
}
//if newExecutions && len(nextActions) > 0 {
// log.Printf("[DEBUG][%s] New execution: %#v. NextActions: %#v", newExecutions, nextActions)
// //handleExecutionResult(*workflowExecution)
//}
resp.WriteHeader(200)
resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
}
func sendSelfRequest(actionResult shuffle.ActionResult) {
log.Printf("[INFO][%s] Not sending backend info since source is default (not swarm)", actionResult.ExecutionId)
return
data, err := json.Marshal(actionResult)
if err != nil {
log.Printf("[ERROR][%s] Shutting down (24): Failed to unmarshal data for backend: %s", actionResult.ExecutionId, err)
return
}
if actionResult.ExecutionId == "TBD" {
return
}
log.Printf("[DEBUG][%s] Sending FAILURE to self to stop the workflow execution. Action: %s (%s), app %s:%s", actionResult.ExecutionId, actionResult.Action.Label, actionResult.Action.ID, actionResult.Action.AppName, actionResult.Action.AppVersion)
// Literally sending to same worker to run it as a new request
streamUrl := fmt.Sprintf("http://localhost:33333/api/v1/streams")
hostenv := os.Getenv("WORKER_HOSTNAME")
if len(hostenv) > 0 {
streamUrl = fmt.Sprintf("http://%s:33333/api/v1/streams", hostenv)
}
req, err := http.NewRequest(
"POST",
streamUrl,
bytes.NewBuffer([]byte(data)),
)
if err != nil {
log.Printf("[ERROR][%s] Failed creating self request (1): %s", actionResult.ExecutionId, err)
return
}
newresp, err := topClient.Do(req)
if err != nil {
log.Printf("[ERROR][%s] Error running self request (2): %s", actionResult.ExecutionId, err)
return
}
defer newresp.Body.Close()
if newresp.Body != nil {
body, err := ioutil.ReadAll(newresp.Body)
//log.Printf("[INFO] BACKEND STATUS: %d", newresp.StatusCode)
if err != nil {
log.Printf("[ERROR][%s] Failed reading self request body: %s", actionResult.ExecutionId, err)
} else {
log.Printf("[DEBUG][%s] NEWRESP (from self - 1): %s", actionResult.ExecutionId, string(body))
}
}
}
func sendResult(workflowExecution shuffle.WorkflowExecution, data []byte) {
if workflowExecution.ExecutionSource == "default" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" {
//log.Printf("[INFO][%s] Not sending backend info since source is default (not swarm)", workflowExecution.ExecutionId)
//return
}
streamUrl := fmt.Sprintf("%s/api/v1/streams", baseUrl)
req, err := http.NewRequest(
"POST",
streamUrl,
bytes.NewBuffer([]byte(data)),
)
if err != nil {
log.Printf("[ERROR][%s] Failed creating finishing request: %s", workflowExecution.ExecutionId, err)
log.Printf("[DEBUG][%s] Shutting down (22)", workflowExecution.ExecutionId)
shutdown(workflowExecution, "", "", false)
return
}
newresp, err := topClient.Do(req)
if err != nil {
log.Printf("[ERROR][%s] Error running finishing request: %s", workflowExecution.ExecutionId, err)
log.Printf("[DEBUG][%s] Shutting down (23)", workflowExecution.ExecutionId)
shutdown(workflowExecution, "", "", false)
return
}
defer newresp.Body.Close()
if newresp.Body != nil {
body, err := ioutil.ReadAll(newresp.Body)
//log.Printf("[INFO] BACKEND STATUS: %d", newresp.StatusCode)
if err != nil {
log.Printf("[ERROR][%s] Failed reading body: %s", workflowExecution.ExecutionId, err)
} else {
log.Printf("[DEBUG][%s] NEWRESP (from backend): %s", workflowExecution.ExecutionId, string(body))
}
}
}
func validateFinished(workflowExecution shuffle.WorkflowExecution) bool {
ctx := context.Background()
newexec, err := shuffle.GetWorkflowExecution(ctx, workflowExecution.ExecutionId)
if err != nil {
log.Printf("[ERROR][%s] Failed getting workflow execution: %s", workflowExecution.ExecutionId, err)
} else {
workflowExecution = *newexec
}
//startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
workflowExecution = shuffle.Fixexecution(ctx, workflowExecution)
_, extra, _, _, _, _, _, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
log.Printf("[INFO][%s] VALIDATION. Status: %s, shuffle.Actions: %d, Extra: %d, Results: %d. Parent: %#v\n", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Workflow.Actions), extra, len(workflowExecution.Results), workflowExecution.ExecutionParent)
//if len(workflowExecution.Results) == len(workflowExecution.Workflow.Actions)+extra {
if (len(environments) == 1 && requestsSent == 0 && len(workflowExecution.Results) >= 1 && os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm") || (len(workflowExecution.Results) >= len(workflowExecution.Workflow.Actions)+extra && len(workflowExecution.Workflow.Actions) > 0) {
if workflowExecution.Status == "FINISHED" {
for _, result := range workflowExecution.Results {
if result.Status == "EXECUTING" || result.Status == "WAITING" {
log.Printf("[WARNING] NOT returning full result, as a result may be unfinished: %s (%s) - %s", result.Action.Label, result.Action.ID, result.Status)
return false
}
}
}
requestsSent += 1
log.Printf("[DEBUG][%s] Should send full result to %s", workflowExecution.ExecutionId, baseUrl)
//data = fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, executionId, authorization)
shutdownData, err := json.Marshal(workflowExecution)
if err != nil {
log.Printf("[ERROR][%s] Shutting down (32): Failed to unmarshal data for backend: %s", workflowExecution.ExecutionId, err)
shutdown(workflowExecution, "", "", true)
}
cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId)
err = shuffle.SetCache(ctx, cacheKey, shutdownData, 30)
if err != nil {
log.Printf("[ERROR][%s] Failed adding to cache during validateFinished", workflowExecution)
}
shuffle.RunCacheCleanup(ctx, workflowExecution)
sendResult(workflowExecution, shutdownData)
return true
}
return false
}
func handleGetStreamResults(resp http.ResponseWriter, request *http.Request) {
body, err := ioutil.ReadAll(request.Body)
if err != nil {
log.Printf("[WARNING] Failed reading body for stream result queue")
resp.WriteHeader(500)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
return
}
defer request.Body.Close()
//log.Printf("[DEBUG] In get stream results with body length %d: %s", len(body), string(body))
var actionResult shuffle.ActionResult
err = json.Unmarshal(body, &actionResult)
if err != nil {
log.Printf("[WARNING] Failed shuffle.ActionResult unmarshaling: %s", err)
//resp.WriteHeader(400)
//resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
//return
}
if len(actionResult.ExecutionId) == 0 {
log.Printf("[WARNING] No workflow execution id in action result (2). Data: %s", string(body))
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "No workflow execution id in action result"}`)))
return
}
ctx := context.Background()
workflowExecution, err := shuffle.GetWorkflowExecution(ctx, actionResult.ExecutionId)
if err != nil {
log.Printf("[INFO] Failed getting execution (streamresult) %s: %s", actionResult.ExecutionId, err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key or execution_id might not exist."}`)))
return
}
// Authorization is done here
if workflowExecution.Authorization != actionResult.Authorization {
log.Printf("Bad authorization key when getting stream results %s.", actionResult.ExecutionId)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key or execution_id might not exist."}`)))
return
}
newjson, err := json.Marshal(workflowExecution)
if err != nil {
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed unpacking workflow execution"}`)))
return
}
resp.WriteHeader(200)
resp.Write(newjson)
}
func setWorkflowExecution(ctx context.Context, workflowExecution shuffle.WorkflowExecution, dbSave bool) error {
if len(workflowExecution.ExecutionId) == 0 {
log.Printf("[DEBUG] Workflowexecution executionId can't be empty.")
return errors.New("ExecutionId can't be empty.")
}
//log.Printf("[DEBUG][%s] Setting with %d results (pre)", workflowExecution.ExecutionId, len(workflowExecution.Results))
workflowExecution = shuffle.Fixexecution(ctx, workflowExecution)
//log.Printf("[DEBUG][%s] Setting with %d results (post)", workflowExecution.ExecutionId, len(workflowExecution.Results))
cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId)
execData, err := json.Marshal(workflowExecution)
if err != nil {
log.Printf("[ERROR] Failed marshalling execution during set: %s", err)
return err
}
err = shuffle.SetCache(ctx, cacheKey, execData, 30)
if err != nil {
log.Printf("[ERROR][%s] Failed adding to cache during setexecution", workflowExecution)
return err
}
//requestCache.Set(cacheKey, &workflowExecution, cache.DefaultExpiration)
handleExecutionResult(workflowExecution)
validateFinished(workflowExecution)
// FIXME: Should this shutdown OR send the result?
// The worker may not be running the backend hmm
if dbSave {
if workflowExecution.ExecutionSource == "default" {
log.Printf("[DEBUG][%s] Shutting down (25)", workflowExecution.ExecutionId)
shutdown(workflowExecution, "", "", true)
//return
} else {
log.Printf("[DEBUG] NOT shutting down with dbSave (%s)", workflowExecution.ExecutionSource)
}
}
return nil
}
// GetLocalIP returns the non loopback local IP of the host
func getLocalIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return ""
}
for _, address := range addrs {
// check the address type and if it is not a loopback the display it
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return ipnet.IP.String()
}
}
}
return ""
}
func getAvailablePort() (net.Listener, error) {
listener, err := net.Listen("tcp", ":0")
if err != nil {
log.Printf("[WARNING] Failed to assign port by default. Defaulting to 5001")
//return ":5001"
return nil, err
}
//defer listener.Close()
return listener, nil
//return fmt.Sprintf(":%d", port)
}
func webserverSetup(workflowExecution shuffle.WorkflowExecution) net.Listener {
hostname = getLocalIP()
os.Setenv("WORKER_HOSTNAME", hostname)
// FIXME: This MAY not work because of speed between first
// container being launched and port being assigned to webserver
listener, err := getAvailablePort()
if err != nil {
log.Printf("[ERROR] Failed to create init listener: %s", err)
return listener
}
log.Printf("[DEBUG] OLD HOSTNAME: %s", appCallbackUrl)
port := listener.Addr().(*net.TCPAddr).Port
log.Printf("\n\n[DEBUG] Starting webserver (2) on port %d with hostname: %s\n\n", port, hostname)
appCallbackUrl = fmt.Sprintf("http://%s:%d", hostname, port)
log.Printf("[INFO] NEW WORKER HOSTNAME: %s", appCallbackUrl)
return listener
}
func downloadDockerImageBackend(client *http.Client, imageName string) error {
log.Printf("[DEBUG] Trying to download image %s from backend %s as it doesn't exist. All images: %#v", imageName, baseUrl, downloadedImages)
if arrayContains(downloadedImages, imageName) {
log.Printf("[DEBUG] Image %s already downloaded", imageName)
return nil
}
downloadedImages = append(downloadedImages, imageName)
data := fmt.Sprintf(`{"name": "%s"}`, imageName)
dockerImgUrl := fmt.Sprintf("%s/api/v1/get_docker_image", baseUrl)
req, err := http.NewRequest(
"POST",
dockerImgUrl,
bytes.NewBuffer([]byte(data)),
)
authorization := os.Getenv("AUTHORIZATION")
if len(authorization) > 0 {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", authorization))
} else {
log.Printf("[WARNING] No auth found - running backend download without it.")
//return
}
newresp, err := client.Do(req)
if err != nil {
log.Printf("[ERROR] Failed download request for %s: %s", imageName, err)
return err
}
defer newresp.Body.Close()
if newresp.StatusCode != 200 {
log.Printf("[ERROR] Docker download for image %s (backend) StatusCode (1): %d", imageName, newresp.StatusCode)
return errors.New(fmt.Sprintf("Failed to get image - status code %d", newresp.StatusCode))
}
newImageName := strings.Replace(imageName, "/", "_", -1)
newFileName := newImageName + ".tar"
tar, err := os.Create(newFileName)
if err != nil {
log.Printf("[WARNING] Failed creating file: %s", err)
return err
}
defer tar.Close()
_, err = io.Copy(tar, newresp.Body)
if err != nil {
log.Printf("[WARNING] Failed response body copying: %s", err)
return err
}
tar.Seek(0, 0)
dockercli, err := dockerclient.NewEnvClient()
if err != nil {
log.Printf("[ERROR] Unable to create docker client (3): %s", err)
return err
}
imageLoadResponse, err := dockercli.ImageLoad(context.Background(), tar, true)
if err != nil {
log.Printf("[ERROR] Error loading images: %s", err)
return err
}
body, err := ioutil.ReadAll(imageLoadResponse.Body)
if err != nil {
log.Printf("[ERROR] Error reading: %s", err)
return err
}
if strings.Contains(string(body), "no such file") {
return errors.New(string(body))
}
baseTag := strings.Split(imageName, ":")
if len(baseTag) > 1 {
tag := baseTag[1]
log.Printf("[DEBUG] Creating tag copies of downloaded containers from tag %s", tag)
// Remapping
ctx := context.Background()
dockercli.ImageTag(ctx, imageName, fmt.Sprintf("frikky/shuffle:%s", tag))
dockercli.ImageTag(ctx, imageName, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag))
downloadedImages = append(downloadedImages, fmt.Sprintf("frikky/shuffle:%s", tag))
downloadedImages = append(downloadedImages, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag))
}
os.Remove(newFileName)
log.Printf("[INFO] Successfully loaded image %s: %s", imageName, string(body))
return nil
}
// Initial loop etc
func main() {
// Elasticsearch necessary to ensure we'ren ot running with Datastore configurations for minimal/maximal data sizes
_, err := shuffle.RunInit(datastore.Client{}, storage.Client{}, "", "worker", true, "elasticsearch")
if err != nil {
log.Printf("[ERROR] Failed to run worker init: %s", err)
} else {
log.Printf("[DEBUG] Ran init for worker to set up cache system. Docker version: %s", dockerApiVersion)
}
log.Printf("[INFO] Setting up worker environment")
sleepTime := 5
client := shuffle.GetExternalClient(baseUrl)
if timezone == "" {
timezone = "Europe/Amsterdam"
}
log.Printf("[INFO] Running with timezone %s and swarm config %#v", timezone, os.Getenv("SHUFFLE_SWARM_CONFIG"))
authorization := ""
executionId := ""
// INFO: Allows you to run a test execution
testing := os.Getenv("WORKER_TESTING_WORKFLOW")
shuffle_apikey := os.Getenv("WORKER_TESTING_APIKEY")
if len(testing) > 0 && len(shuffle_apikey) > 0 {
// Execute a workflow and use that info
log.Printf("[WARNING] Running test environment for worker by executing workflow %s. PS: This may NOT reach the worker in real time, but rather be deployed as a docker container (bad). Instead use AUTHORIZATION and EXECUTIONID for direct testing", testing)
authorization, executionId = runTestExecution(client, testing, shuffle_apikey)
} else {
authorization = os.Getenv("AUTHORIZATION")
executionId = os.Getenv("EXECUTIONID")
log.Printf("[INFO] Running normal execution with auth %s and ID %s", authorization, executionId)
}
workflowExecution := shuffle.WorkflowExecution{
ExecutionId: executionId,
}
if len(authorization) == 0 {
log.Printf("[INFO] No AUTHORIZATION key set in env")
log.Printf("[DEBUG] Shutting down (27)")
shutdown(workflowExecution, "", "", false)
}
if len(executionId) == 0 {
log.Printf("[INFO] No EXECUTIONID key set in env")
log.Printf("[DEBUG] Shutting down (28)")
shutdown(workflowExecution, "", "", false)
}
data = fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, executionId, authorization)
streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", baseUrl)
req, err := http.NewRequest(
"POST",
streamResultUrl,
bytes.NewBuffer([]byte(data)),
)
if err != nil {
log.Printf("[ERROR] Failed making request builder for backend")
log.Printf("[DEBUG] Shutting down (29)")
shutdown(workflowExecution, "", "", true)
}
topClient = client
firstRequest := true
environments := []string{}
for {
// Because of this, it always has updated data.
// Removed request requirement from app_sdk
newresp, err := client.Do(req)
if err != nil {
log.Printf("[ERROR] Failed request: %s", err)
time.Sleep(time.Duration(sleepTime) * time.Second)
continue
}
defer newresp.Body.Close()
body, err := ioutil.ReadAll(newresp.Body)
if err != nil {
log.Printf("[ERROR] Failed reading body: %s", err)
time.Sleep(time.Duration(sleepTime) * time.Second)
continue
}
if newresp.StatusCode != 200 {
log.Printf("[ERROR] %s\nStatusCode (1): %d", string(body), newresp.StatusCode)
time.Sleep(time.Duration(sleepTime) * time.Second)
continue
}
err = json.Unmarshal(body, &workflowExecution)
if err != nil {
log.Printf("[ERROR] Failed workflowExecution unmarshal: %s", err)
time.Sleep(time.Duration(sleepTime) * time.Second)
continue
}
if firstRequest {
firstRequest = false
//workflowExecution.StartedAt = int64(time.Now().Unix())
ctx := context.Background()
cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId)
execData, err := json.Marshal(workflowExecution)
if err != nil {
log.Printf("[ERROR][%s] Failed marshalling execution during set (3): %s", workflowExecution.ExecutionId, err)
} else {
err = shuffle.SetCache(ctx, cacheKey, execData, 30)
if err != nil {
log.Printf("[ERROR][%s] Failed adding to cache during setexecution (3): %s", workflowExecution.ExecutionId, err)
}
}
//requestCache = cache.New(60*time.Minute, 120*time.Minute)
//requestCache.Set(cacheKey, &workflowExecution, cache.DefaultExpiration)
for _, action := range workflowExecution.Workflow.Actions {
found := false
for _, environment := range environments {
if action.Environment == environment {
found = true
break
}
}
if !found {
environments = append(environments, action.Environment)
}
}
// Checks if a subflow is child of the startnode, as sub-subflows aren't working properly yet
childNodes := shuffle.FindChildNodes(workflowExecution, workflowExecution.Start, []string{}, []string{})
log.Printf("[DEBUG] Looking for subflow in %#v to check execution pattern as child of %s", childNodes, workflowExecution.Start)
subflowFound := false
for _, childNode := range childNodes {
for _, trigger := range workflowExecution.Workflow.Triggers {
if trigger.ID != childNode {
continue
}
if trigger.AppName == "Shuffle Workflow" {
subflowFound = true
break
}
}
if subflowFound {
break
}
}
log.Printf("\n\nEnvironments: %s. Source: %s. 1 env = webserver, 0 or >1 = default. Subflow exists: %#v\n\n", environments, workflowExecution.ExecutionSource, subflowFound)
if len(environments) == 1 && workflowExecution.ExecutionSource != "default" && !subflowFound {
log.Printf("\n\n[DEBUG] Running OPTIMIZED execution (not manual)\n\n")
listener := webserverSetup(workflowExecution)
err := executionInit(workflowExecution)
if err != nil {
log.Printf("[DEBUG] Workflow setup failed: %s", workflowExecution.ExecutionId, err)
log.Printf("[DEBUG] Shutting down (30)")
shutdown(workflowExecution, "", "", true)
}
go func() {
time.Sleep(time.Duration(1))
handleExecutionResult(workflowExecution)
}()
runWebserver(listener)
//log.Printf("Before wait")
//wg := sync.WaitGroup{}
//wg.Add(1)
//wg.Wait()
} else {
log.Printf("\n\n[DEBUG] Running NON-OPTIMIZED execution for type %s with %d environment(s). This only happens when ran manually OR when running with subflows. Status: %s\n\n", workflowExecution.ExecutionSource, len(environments), workflowExecution.Status)
err := executionInit(workflowExecution)
if err != nil {
log.Printf("[DEBUG] Workflow setup failed: %s", workflowExecution.ExecutionId, err)
shutdown(workflowExecution, "", "", true)
}
// Trying to make worker into microservice~ :)
}
}
if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "SUCCESS" {
log.Printf("[DEBUG] Workflow %s is finished. Exiting worker.", workflowExecution.ExecutionId)
log.Printf("[DEBUG] Shutting down (31)")
shutdown(workflowExecution, "", "", true)
}
if workflowExecution.Status == "EXECUTING" || workflowExecution.Status == "RUNNING" {
//log.Printf("Status: %s", workflowExecution.Status)
err = handleDefaultExecution(client, req, workflowExecution)
if err != nil {
log.Printf("[DEBUG] Workflow %s is finished: %s", workflowExecution.ExecutionId, err)
log.Printf("[DEBUG] Shutting down (32)")
shutdown(workflowExecution, "", "", true)
}
} else {
log.Printf("[DEBUG] Workflow %s has status %s. Exiting worker.", workflowExecution.ExecutionId, workflowExecution.Status)
log.Printf("[DEBUG] Shutting down (33)")
shutdown(workflowExecution, workflowExecution.Workflow.ID, "", true)
}
time.Sleep(time.Duration(sleepTime) * time.Second)
}
}
func checkUnfinished(resp http.ResponseWriter, request *http.Request, execRequest shuffle.OrborusExecutionRequest) {
// Meant as a function that periodically checks whether previous executions have finished or not.
// Should probably be based on executedIds and finishedIds
// Schedule a check in the future instead?
ctx := context.Background()
exec, err := shuffle.GetWorkflowExecution(ctx, execRequest.ExecutionId)
log.Printf("[DEBUG][%s] Rechecking execution and it's status to send to backend IF the status is EXECUTING (%s - %d/%d finished)", execRequest.ExecutionId, exec.Status, len(exec.Results), len(exec.Workflow.Actions))
if err != nil {
return
}
// FIXMe: Does this create issue with infinite loops?
// Usually caused by issue during startup
if exec.Status == "" {
//handleRunExecution(resp, request)
return
}
if exec.Status != "EXECUTING" {
return
}
log.Printf("[DEBUG][%s] Should send full result for execution to backend as it has %d results. Status: %s", execRequest.ExecutionId, len(exec.Results), exec.Status)
data, err := json.Marshal(exec)
if err != nil {
return
}
sendResult(*exec, data)
}
func handleRunExecution(resp http.ResponseWriter, request *http.Request) {
body, err := ioutil.ReadAll(request.Body)
if err != nil {
log.Printf("[WARNING] Failed reading body for stream result queue")
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
return
}
defer request.Body.Close()
//log.Printf("[DEBUG] In run execution with body length %d", len(body))
var execRequest shuffle.OrborusExecutionRequest
err = json.Unmarshal(body, &execRequest)
if err != nil {
log.Printf("[WARNING] Failed shuffle.WorkflowExecution unmarshaling: %s", err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
return
}
// Checks if a workflow is done 30 seconds later, and sends info to backend no matter what
go func() {
time.Sleep(time.Duration(30) * time.Second)
checkUnfinished(resp, request, execRequest)
}()
// FIXME: This should be PER EXECUTION
//if strings.ToLower(os.Getenv("SHUFFLE_PASS_APP_PROXY")) == "true" {
// Is it ok if these are standard? Should they be update-able after launch? Hmm
if len(execRequest.HTTPProxy) > 0 {
log.Printf("[DEBUG] Sending proxy info to child process")
os.Setenv("SHUFFLE_PASS_APP_PROXY", execRequest.ShufflePassProxyToApp)
}
if len(execRequest.HTTPProxy) > 0 {
log.Printf("[DEBUG] Running with default HTTP proxy %s", execRequest.HTTPProxy)
os.Setenv("HTTP_PROXY", execRequest.HTTPProxy)
}
if len(execRequest.HTTPSProxy) > 0 {
log.Printf("[DEBUG] Running with default HTTPS proxy %s", execRequest.HTTPSProxy)
os.Setenv("HTTPS_PROXY", execRequest.HTTPSProxy)
}
if len(execRequest.EnvironmentName) > 0 {
os.Setenv("ENVIRONMENT_NAME", execRequest.EnvironmentName)
environment = execRequest.EnvironmentName
}
if len(execRequest.Timezone) > 0 {
os.Setenv("TZ", execRequest.Timezone)
timezone = execRequest.Timezone
}
if len(execRequest.Cleanup) > 0 {
os.Setenv("CLEANUP", execRequest.Cleanup)
cleanupEnv = execRequest.Cleanup
}
if len(execRequest.BaseUrl) > 0 {
os.Setenv("BASE_URL", execRequest.BaseUrl)
baseUrl = execRequest.BaseUrl
}
// Setting to just have an auth available.
if len(execRequest.Authorization) > 0 && len(os.Getenv("AUTHORIZATION")) == 0 {
//log.Printf("[DEBUG] Sending proxy info to child process")
os.Setenv("AUTHORIZATION", execRequest.Authorization)
}
topClient = &http.Client{}
var workflowExecution shuffle.WorkflowExecution
data = fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, execRequest.ExecutionId, execRequest.Authorization)
streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", baseUrl)
req, err := http.NewRequest(
"POST",
streamResultUrl,
bytes.NewBuffer([]byte(data)),
)
newresp, err := topClient.Do(req)
if err != nil {
log.Printf("[ERROR] Failed making request (2): %s", err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
return
}
body, err = ioutil.ReadAll(newresp.Body)
if err != nil {
log.Printf("[ERROR] Failed reading body (2): %s", err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
return
}
if newresp.StatusCode != 200 {
log.Printf("[ERROR] Bad statuscode: %d, %s", newresp.StatusCode, string(body))
if strings.Contains(string(body), "Workflowexecution is already finished") {
log.Printf("[DEBUG] Shutting down (19)")
//shutdown(workflowExecution, "", "", true)
}
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad statuscode: %d"}`, newresp.StatusCode)))
return
}
err = json.Unmarshal(body, &workflowExecution)
if err != nil {
log.Printf("[ERROR] Failed workflowExecution unmarshal: %s", err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err)))
return
}
ctx := context.Background()
err = setWorkflowExecution(ctx, workflowExecution, true)
if err != nil {
log.Printf("[ERROR] Failed initializing execution saving for %s: %s", workflowExecution.ExecutionId, err)
}
if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "SUCCESS" {
log.Printf("[DEBUG] Workflow %s is finished. Exiting worker.", workflowExecution.ExecutionId)
log.Printf("[DEBUG] Shutting down (20)")
resp.WriteHeader(200)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad status for execution - already %s. Returning with 200 OK"}`, workflowExecution.Status)))
return
}
//startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId)
extra := 0
for _, trigger := range workflowExecution.Workflow.Triggers {
//log.Printf("Appname trigger (0): %s", trigger.AppName)
if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" {
extra += 1
}
}
log.Printf("[INFO][%s] Status: %s, Results: %d, actions: %d", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results), len(workflowExecution.Workflow.Actions)+extra)
if workflowExecution.Status != "EXECUTING" {
log.Printf("[WARNING] Exiting as worker execution has status %s!", workflowExecution.Status)
log.Printf("[DEBUG] Shutting down (21)")
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad status %s for the workflow execution %s"}`, workflowExecution.Status, workflowExecution.ExecutionId)))
return
}
//log.Printf("[DEBUG] Starting execution :O")
cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId)
execData, err := json.Marshal(workflowExecution)
if err != nil {
log.Printf("[ERROR][%s] Failed marshalling execution during set (3): %s", workflowExecution.ExecutionId, err)
} else {
err = shuffle.SetCache(ctx, cacheKey, execData, 30)
if err != nil {
log.Printf("[ERROR][%s] Failed adding to cache during setexecution (3): %s", workflowExecution.ExecutionId, err)
}
}
//requestCache.Set(cacheKey, &workflowExecution, cache.DefaultExpiration)
err = executionInit(workflowExecution)
if err != nil {
log.Printf("[DEBUG][%s] Shutting down (30) - Workflow setup failed: %s", workflowExecution.ExecutionId, workflowExecution.ExecutionId, err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Error in execution init: %s"}`, err)))
return
//shutdown(workflowExecution, "", "", true)
}
//go handleExecutionResult(workflowExecution)
handleExecutionResult(workflowExecution)
resp.WriteHeader(200)
resp.Write([]byte(fmt.Sprintf(`{"success": true}`)))
}
func runWebserver(listener net.Listener) {
r := mux.NewRouter()
r.HandleFunc("/api/v1/streams", handleWorkflowQueue).Methods("POST", "OPTIONS")
r.HandleFunc("/api/v1/streams/results", handleGetStreamResults).Methods("POST", "OPTIONS")
//log.Fatal(http.ListenAndServe(port, nil))
//srv := http.Server{
// Addr: ":8888",
// WriteTimeout: 1 * time.Second,
// Handler: http.HandlerFunc(slowHandler),
//}
//log.Fatal(http.Serve(listener, nil))
log.Printf("\n\n[DEBUG] NEW webserver setup\n\n")
http.Handle("/", r)
srv := http.Server{
Handler: r,
ReadTimeout: 60 * time.Second,
ReadHeaderTimeout: 60 * time.Second,
IdleTimeout: 60 * time.Second,
WriteTimeout: 60 * time.Second,
}
err := srv.Serve(listener)
if err != nil {
log.Printf("serveIssue: %#v", err)
}
log.Printf("[DEBUG] Do we see this?")
}