package main import ( "github.com/shuffle/shuffle-shared" //"bufio" "bytes" "context" "encoding/json" "errors" "fmt" "io" "io/ioutil" "log" "net" "net/http" "net/url" "os" "strings" "time" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" //"github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/mount" dockerclient "github.com/docker/docker/client" //"github.com/go-git/go-billy/v5/memfs" //newdockerclient "github.com/fsouza/go-dockerclient" //"github.com/satori/go.uuid" "github.com/gorilla/mux" "github.com/patrickmn/go-cache" "github.com/satori/go.uuid" // No necessary outside shared "cloud.google.com/go/datastore" "cloud.google.com/go/storage" //k8s deps corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "path/filepath" // "k8s.io/client-go/util/retry" ) // This is getting out of hand :) var environment = os.Getenv("ENVIRONMENT_NAME") var baseUrl = os.Getenv("BASE_URL") var appCallbackUrl = os.Getenv("BASE_URL") var cleanupEnv = strings.ToLower(os.Getenv("CLEANUP")) var dockerApiVersion = strings.ToLower(os.Getenv("DOCKER_API_VERSION")) var swarmNetworkName = os.Getenv("SHUFFLE_SWARM_NETWORK_NAME") var timezone = os.Getenv("TZ") var baseimagename = "frikky/shuffle" // var baseimagename = "registry.hub.docker.com/frikky/shuffle" var registryName = "registry.hub.docker.com" var sleepTime = 2 var requestCache *cache.Cache var topClient *http.Client var data string var requestsSent = 0 var appsInitialized = false var hostname string /* var environments []string var parents map[string][]string var children map[string][]string var visited []string var executed []string var nextActions []string var extra int var startAction string */ //var results []shuffle.ActionResult //var allLogs map[string]string //var containerIds []string var downloadedImages []string // Images to be autodeployed in the latest version of Shuffle. var autoDeploy = map[string]string{ "http:1.3.0": "frikky/shuffle:http_1.3.0", "http:1.4.0": "frikky/shuffle:http_1.4.0", "shuffle-tools:1.2.0": "frikky/shuffle:shuffle-tools_1.2.0", "shuffle-subflow:1.0.0": "frikky/shuffle:shuffle-subflow_1.0.0", "shuffle-subflow:1.1.0": "frikky/shuffle:shuffle-subflow_1.1.0", } //"testing:1.0.0": "frikky/shuffle:testing_1.0.0", //fmt.Sprintf("%s_%s", workflowExecution.ExecutionId, action.ID) // New Worker mappings // visited, appendActions, nextActions, notFound, queueNodes, toRemove, executed, env var portMappings map[string]int var baseport = 33333 type UserInputSubflow struct { Argument string `json:"execution_argument"` ContinueUrl string `json:"continue_url"` CancelUrl string `json:"cancel_url"` } // removes every container except itself (worker) func shutdown(workflowExecution shuffle.WorkflowExecution, nodeId string, reason string, handleResultSend bool) { log.Printf("[DEBUG][%s] Shutdown (%s) started with reason %#v. Result amount: %d. ResultsSent: %d, Send result: %#v, Parent: %#v", workflowExecution.ExecutionId, workflowExecution.Status, reason, len(workflowExecution.Results), requestsSent, handleResultSend, workflowExecution.ExecutionParent) //reason := "Error in execution" sleepDuration := 1 if handleResultSend && requestsSent < 2 { shutdownData, err := json.Marshal(workflowExecution) if err == nil { sendResult(workflowExecution, shutdownData) log.Printf("[WARNING][%s] Sent shutdown update with %d results and result value %s", workflowExecution.ExecutionId, len(workflowExecution.Results), reason) } else { log.Printf("[WARNING][%s] Failed to send update: %s", workflowExecution.ExecutionId, err) } time.Sleep(time.Duration(sleepDuration) * time.Second) } if len(reason) > 0 && len(nodeId) > 0 { //log.Printf("[INFO] Running abort of workflow because it should be finished") abortUrl := fmt.Sprintf("%s/api/v1/workflows/%s/executions/%s/abort", baseUrl, workflowExecution.Workflow.ID, workflowExecution.ExecutionId) path := fmt.Sprintf("?reason=%s", url.QueryEscape(reason)) if len(nodeId) > 0 { path += fmt.Sprintf("&node=%s", url.QueryEscape(nodeId)) } if len(environment) > 0 { path += fmt.Sprintf("&env=%s", url.QueryEscape(environment)) } //fmt.Printf(url.QueryEscape(query)) abortUrl += path log.Printf("[DEBUG][%s] Abort URL: %s", workflowExecution.ExecutionId, abortUrl) req, err := http.NewRequest( "GET", abortUrl, nil, ) if err != nil { log.Printf("[WARNING][%s] Failed building request: %s", workflowExecution.ExecutionId, err) } authorization := os.Getenv("AUTHORIZATION") if len(authorization) > 0 { req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", authorization)) } else { log.Printf("[ERROR][%s] No authorization specified for abort", workflowExecution.ExecutionId) } req.Header.Add("Content-Type", "application/json") client := shuffle.GetExternalClient(baseUrl) //log.Printf("[DEBUG][%s] All App Logs: %#v", workflowExecution.ExecutionId, allLogs) newresp, err := client.Do(req) if err != nil { log.Printf("[WARNING][%s] Failed abort request: %s", workflowExecution.ExecutionId, err) } else { defer newresp.Body.Close() } } else { //log.Printf("[INFO][%s] NOT running abort during shutdown.", workflowExecution.ExecutionId) } log.Printf("[DEBUG][%s] Finished shutdown (after %d seconds). ", workflowExecution.ExecutionId, sleepDuration) //Finished shutdown (after %d seconds). ", sleepDuration) // Allows everything to finish in subprocesses (apps) time.Sleep(time.Duration(sleepDuration) * time.Second) os.Exit(3) } // } func isRunningInCluster() bool { _, existsHost := os.LookupEnv("KUBERNETES_SERVICE_HOST") _, existsPort := os.LookupEnv("KUBERNETES_SERVICE_PORT") return existsHost && existsPort } func buildEnvVars(envMap map[string]string) []corev1.EnvVar { var envVars []corev1.EnvVar for key, value := range envMap { envVars = append(envVars, corev1.EnvVar{Name: key, Value: value}) } return envVars } func getKubernetesClient() (*kubernetes.Clientset, error) { if isRunningInCluster() { config, err := rest.InClusterConfig() if err != nil { return nil, err } clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, err } return clientset, nil } else { home := homedir.HomeDir() kubeconfigPath := filepath.Join(home, ".kube", "config") config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) if err != nil { return nil, err } clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, err } return clientset, nil } } // Deploys the internal worker whenever something happens func deployApp(cli *dockerclient.Client, image string, identifier string, env []string, workflowExecution shuffle.WorkflowExecution, action shuffle.Action) error { // log.Printf("################################### new call to deployApp ###################################") // log.Printf("image: %s", image) // log.Printf("identifier: %s", identifier) // log.Printf("execution: %+v", workflowExecution) log.Printf("[DEBUG] Adding SHUFFLE_APP_SDK_TIMEOUT=%s", os.Getenv("SHUFFLE_APP_SDK_TIMEOUT")) env = append(env, fmt.Sprintf("SHUFFLE_APP_SDK_TIMEOUT=%s", os.Getenv("SHUFFLE_APP_SDK_TIMEOUT"))) if os.Getenv("IS_KUBERNETES") == "true" { namespace := "shuffle" localRegistry := os.Getenv("REGISTRY_URL") envMap := make(map[string]string) for _, envStr := range env { parts := strings.SplitN(envStr, "=", 2) if len(parts) == 2 { envMap[parts[0]] = parts[1] } } clientset, err := getKubernetesClient() if err != nil { fmt.Println("[ERROR]Error getting kubernetes client:", err) // os.Exit(1) } log.Printf("[DEBUG] Got kubernetes client") str := strings.ToLower(identifier) strSplit := strings.Split(str, "_") value := strSplit[0] value = strings.ReplaceAll(value, "_", "-") // checking if app is generated or not appDetails := strings.Split(image, ":")[1] appDetailsSplit := strings.Split(appDetails, "_") appName := strings.Join(appDetailsSplit[:len(appDetailsSplit)-1], "_") appVersion := appDetailsSplit[len(appDetailsSplit)-1] // log.Printf("APP VERSION IS: %s", appVersion) for _, app := range workflowExecution.Workflow.Actions { // log.Printf("[DEBUG] App: %s, Version: %s", appName, appVersion) // log.Printf("[DEBUG] Checking app %s with version %s", app.AppName, app.AppVersion) if app.AppName == appName && app.AppVersion == appVersion { if app.Generated == true { log.Printf("[DEBUG] Generated app, setting local registry") image = fmt.Sprintf("%s/%s", localRegistry, image) break } else { log.Printf("[DEBUG] Not generated app, setting shuffle registry") } } } //fix naming convention podUuid := uuid.NewV4().String() podName := fmt.Sprintf("%s-%s", value, podUuid) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: podName, Labels: map[string]string{ "app": "shuffle-app", "executionId": workflowExecution.ExecutionId, }, }, Spec: corev1.PodSpec{ // NodeName: "worker1" RestartPolicy: "Never", Containers: []corev1.Container{ { Name: value, Image: image, Env: buildEnvVars(envMap), // ImagePullPolicy: corev1.PullAlways, }, }, }, } createdPod, err := clientset.CoreV1().Pods(namespace).Create(context.Background(), pod, metav1.CreateOptions{}) if err != nil { fmt.Fprintf(os.Stderr, "Error creating pod: %v\n", err) // os.Exit(1) } fmt.Printf("[DEBUG] Created pod %q in namespace %q\n", createdPod.Name, createdPod.Namespace) } else { // form basic hostConfig ctx := context.Background() if action.AppName == "shuffle-subflow" { // Automatic replacement of URL for paramIndex, param := range action.Parameters { if param.Name != "backend_url" { continue } if strings.Contains(param.Value, "shuffle-backend") { // Automatic replacement as this is default action.Parameters[paramIndex].Value = os.Getenv("BASE_URL") log.Printf("[DEBUG][%s] Replaced backend_url with %s", workflowExecution.ExecutionId, os.Getenv("BASE_URL")) } } } // Max 10% CPU every second //CPUShares: 128, //CPUQuota: 10000, //CPUPeriod: 100000, hostConfig := &container.HostConfig{ LogConfig: container.LogConfig{ Type: "json-file", Config: map[string]string{ "max-size": "10m", }, }, Resources: container.Resources{}, } hostConfig.NetworkMode = container.NetworkMode(fmt.Sprintf("container:worker-%s", workflowExecution.ExecutionId)) // Removing because log extraction should happen first if cleanupEnv == "true" { hostConfig.AutoRemove = true } // FIXME: Add proper foldermounts here //log.Printf("\n\nPRE FOLDERMOUNT\n\n") //volumeBinds := []string{"/tmp/shuffle-mount:/rules"} //volumeBinds := []string{"/tmp/shuffle-mount:/rules"} volumeBinds := []string{} if len(volumeBinds) > 0 { log.Printf("[DEBUG] Setting up binds for container!") hostConfig.Binds = volumeBinds hostConfig.Mounts = []mount.Mount{} for _, bind := range volumeBinds { if !strings.Contains(bind, ":") || strings.Contains(bind, "..") || strings.HasPrefix(bind, "~") { log.Printf("[WARNING] Bind %s is invalid.", bind) continue } log.Printf("[DEBUG] Appending bind %s", bind) bindSplit := strings.Split(bind, ":") sourceFolder := bindSplit[0] destinationFolder := bindSplit[0] hostConfig.Mounts = append(hostConfig.Mounts, mount.Mount{ Type: mount.TypeBind, Source: sourceFolder, Target: destinationFolder, }) } } else { //log.Printf("[WARNING] Not mounting folders") } config := &container.Config{ Image: image, Env: env, } // Checking as late as possible, just in case. newExecId := fmt.Sprintf("%s_%s", workflowExecution.ExecutionId, action.ID) _, err := shuffle.GetCache(ctx, newExecId) if err == nil { log.Printf("\n\n[DEBUG] Result for %s already found - returning\n\n", newExecId) return nil } cacheData := []byte("1") err = shuffle.SetCache(ctx, newExecId, cacheData, 30) if err != nil { log.Printf("[WARNING] Failed setting cache for action %s: %s", newExecId, err) } else { log.Printf("[DEBUG] Adding %s to cache. Name: %s", newExecId, action.Name) } if action.ExecutionDelay > 0 { log.Printf("[DEBUG] Running app %s in docker with delay of %d", action.Name, action.ExecutionDelay) waitTime := time.Duration(action.ExecutionDelay) * time.Second time.AfterFunc(waitTime, func() { DeployContainer(ctx, cli, config, hostConfig, identifier, workflowExecution, newExecId) }) } else { log.Printf("[DEBUG] Running app %s in docker NORMALLY as there is no delay set with identifier %s", action.Name, identifier) returnvalue := DeployContainer(ctx, cli, config, hostConfig, identifier, workflowExecution, newExecId) log.Printf("[DEBUG] Normal deploy ret: %s", returnvalue) return returnvalue } return nil } return nil } func cleanupExecution(clientset *kubernetes.Clientset, workflowExecution shuffle.WorkflowExecution, namespace string) error { workerName := fmt.Sprintf("worker-%s", workflowExecution.ExecutionId) labelSelector := fmt.Sprintf("app=shuffle-app,executionId=%s", workflowExecution.ExecutionId) podList, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{ LabelSelector: labelSelector, }) if err != nil { return fmt.Errorf("[ERROR]failed to list apps with label selector %s: %v", labelSelector, err) } for _, pod := range podList.Items { err := clientset.CoreV1().Pods(namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) if err != nil { return fmt.Errorf("failed to delete app %s: %v", pod.Name, err) } fmt.Printf("App %s in namespace %s deleted.\n", pod.Name, namespace) } podErr := clientset.CoreV1().Pods(namespace).Delete(context.TODO(), workerName, metav1.DeleteOptions{}) if podErr != nil { return fmt.Errorf("[ERROR] failed to delete the worker %s in namespace %s: %v", workerName, namespace, podErr) } fmt.Printf("[DEBUG] %s in namespace %s deleted.\n", workerName, namespace) return nil } func DeployContainer(ctx context.Context, cli *dockerclient.Client, config *container.Config, hostConfig *container.HostConfig, identifier string, workflowExecution shuffle.WorkflowExecution, newExecId string) error { cont, err := cli.ContainerCreate( ctx, config, hostConfig, nil, nil, identifier, ) if err != nil { //log.Printf("[ERROR] Failed creating container: %s", err) if !strings.Contains(err.Error(), "Conflict. The container name") { log.Printf("[ERROR] Container CREATE error (1): %s", err) cacheErr := shuffle.DeleteCache(ctx, newExecId) if cacheErr != nil { log.Printf("[ERROR] FAILED Deleting cache for %s: %s", newExecId, cacheErr) } return err } else { parsedUuid := uuid.NewV4() identifier = fmt.Sprintf("%s-%s", identifier, parsedUuid) //hostConfig.NetworkMode = container.NetworkMode(fmt.Sprintf("container:worker-%s", workflowExecution.ExecutionId)) log.Printf("[DEBUG] 2 - Identifier: %s", identifier) cont, err = cli.ContainerCreate( context.Background(), config, hostConfig, nil, nil, identifier, ) if err != nil { log.Printf("[ERROR] Container create error (2): %s", err) cacheErr := shuffle.DeleteCache(ctx, newExecId) if cacheErr != nil { log.Printf("[ERROR] FAILED Deleting cache for %s: %s", newExecId, cacheErr) } return err } //log.Printf("[DEBUG] Made new container ID } } err = cli.ContainerStart(ctx, cont.ID, types.ContainerStartOptions{}) if err != nil { if strings.Contains(fmt.Sprintf("%s", err), "cannot join network") || strings.Contains(fmt.Sprintf("%s", err), "No such container") { parsedUuid := uuid.NewV4() identifier = fmt.Sprintf("%s-%s-nonetwork", identifier, parsedUuid) hostConfig = &container.HostConfig{ LogConfig: container.LogConfig{ Type: "json-file", Config: map[string]string{ "max-size": "10m", }, }, Resources: container.Resources{}, } cont, err = cli.ContainerCreate( context.Background(), config, hostConfig, nil, nil, identifier, ) if err != nil { log.Printf("[ERROR] Container create error (3): %s", err) cacheErr := shuffle.DeleteCache(ctx, newExecId) if cacheErr != nil { log.Printf("[ERROR] FAILED Deleting cache for %s: %s", newExecId, cacheErr) } return err } log.Printf("[DEBUG] Running secondary check without network with worker") err = cli.ContainerStart(ctx, cont.ID, types.ContainerStartOptions{}) } if err != nil { log.Printf("[ERROR] Failed to start container in environment %s: %s", environment, err) cacheErr := shuffle.DeleteCache(ctx, newExecId) if cacheErr != nil { log.Printf("[ERROR] FAILED Deleting cache for %s: %s", newExecId, cacheErr) } //shutdown(workflowExecution, workflowExecution.Workflow.ID, true) return err } } log.Printf("[DEBUG] Container %s was created for %s", cont.ID, identifier) // Waiting to see if it exits.. Stupid, but stable(r) if workflowExecution.ExecutionSource != "default" { log.Printf("[INFO] Handling NON-default execution source %s - NOT waiting or validating!", workflowExecution.ExecutionSource) } else if workflowExecution.ExecutionSource == "default" { log.Printf("[INFO] Handling DEFAULT execution source %s - SKIPPING wait anyway due to exited issues!", workflowExecution.ExecutionSource) } log.Printf("[DEBUG] Deployed container ID %s", cont.ID) //containerIds = append(containerIds, cont.ID) return nil } func removeContainer(containername string) error { ctx := context.Background() cli, err := dockerclient.NewEnvClient() if err != nil { log.Printf("[DEBUG] Unable to create docker client: %s", err) return err } // FIXME - ucnomment // containers, err := cli.ContainerList(ctx, types.ContainerListOptions{ // All: true, // }) _ = ctx _ = cli //if err := cli.ContainerStop(ctx, containername, nil); err != nil { // log.Printf("Unable to stop container %s - running removal anyway, just in case: %s", containername, err) //} removeOptions := types.ContainerRemoveOptions{ RemoveVolumes: true, Force: true, } // FIXME - remove comments etc _ = removeOptions //if err := cli.ContainerRemove(ctx, containername, removeOptions); err != nil { // log.Printf("Unable to remove container: %s", err) //} return nil } func runFilter(workflowExecution shuffle.WorkflowExecution, action shuffle.Action) { // 1. Get the parameter $.#.id if action.Label == "filter_cases" && len(action.Parameters) > 0 { if action.Parameters[0].Variant == "ACTION_RESULT" { param := action.Parameters[0] value := param.Value _ = value // Loop cases.. Hmm, that's tricky } } else { log.Printf("No handler for filter %s with %d params", action.Label, len(action.Parameters)) } } func removeIndex(s []string, i int) []string { s[len(s)-1], s[i] = s[i], s[len(s)-1] return s[:len(s)-1] } func handleExecutionResult(workflowExecution shuffle.WorkflowExecution) { ctx := context.Background() //log.Printf("[DEBUG][%s] Pre DecideExecution", workflowExecution.ExecutionId) workflowExecution, relevantActions := shuffle.DecideExecution(ctx, workflowExecution, environment) startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId) dockercli, err := dockerclient.NewEnvClient() if err != nil { log.Printf("[ERROR] Unable to create docker client (3): %s", err) return } // log.Printf("\n\n[DEBUG] Got %d relevant action(s) to run!\n\n", len(relevantActions)) for _, action := range relevantActions { appname := action.AppName appversion := action.AppVersion appname = strings.Replace(appname, ".", "-", -1) appversion = strings.Replace(appversion, ".", "-", -1) parsedAppname := strings.Replace(strings.ToLower(action.AppName), " ", "-", -1) image := fmt.Sprintf("%s:%s_%s", baseimagename, parsedAppname, action.AppVersion) if strings.Contains(image, " ") { image = strings.ReplaceAll(image, " ", "-") } // Added UUID to identifier just in case //identifier := fmt.Sprintf("%s_%s_%s_%s_%s", appname, appversion, action.ID, workflowExecution.ExecutionId, uuid.NewV4()) identifier := fmt.Sprintf("%s_%s_%s_%s", appname, appversion, action.ID, workflowExecution.ExecutionId) if strings.Contains(identifier, " ") { identifier = strings.ReplaceAll(identifier, " ", "-") } //if arrayContains(executed, action.ID) || arrayContains(visited, action.ID) { // log.Printf("[WARNING] Action %s is already executed") // continue //} //visited = append(visited, action.ID) //executed = append(executed, action.ID) // FIXME - check whether it's running locally yet too stats, err := dockercli.ContainerInspect(context.Background(), identifier) if err != nil || stats.ContainerJSONBase.State.Status != "running" { // REMOVE if err == nil { log.Printf("[DEBUG][%s] Docker Container Status: %s, should kill: %s", workflowExecution.ExecutionId, stats.ContainerJSONBase.State.Status, identifier) err = removeContainer(identifier) if err != nil { log.Printf("Error killing container: %s", err) } } else { //log.Printf("WHAT TO DO HERE?: %s", err) } } else if stats.ContainerJSONBase.State.Status == "running" { //log.Printf(" continue } if len(action.Parameters) == 0 { action.Parameters = []shuffle.WorkflowAppActionParameter{} } if len(action.Errors) == 0 { action.Errors = []string{} } // marshal action and put it in there rofl log.Printf("[INFO][%s] Time to execute %s (%s) with app %s:%s, function %s, env %s with %d parameters.", workflowExecution.ExecutionId, action.ID, action.Label, action.AppName, action.AppVersion, action.Name, action.Environment, len(action.Parameters)) actionData, err := json.Marshal(action) if err != nil { log.Printf("[WARNING] Failed unmarshalling action: %s", err) continue } if action.AppID == "0ca8887e-b4af-4e3e-887c-87e9d3bc3d3e" { log.Printf("[DEBUG] Should run filter: %#v\n\n", action) runFilter(workflowExecution, action) continue } executionData, err := json.Marshal(workflowExecution) if err != nil { log.Printf("[ERROR] Failed marshalling executiondata: %s", err) executionData = []byte("") } // Sending full execution so that it won't have to load in every app // This might be an issue if they can read environments, but that's alright // if everything is generated during execution //log.Printf("[DEBUG][%s] Deployed with CALLBACK_URL %s and BASE_URL %s", workflowExecution.ExecutionId, appCallbackUrl, baseUrl) env := []string{ fmt.Sprintf("EXECUTIONID=%s", workflowExecution.ExecutionId), fmt.Sprintf("AUTHORIZATION=%s", workflowExecution.Authorization), fmt.Sprintf("CALLBACK_URL=%s", baseUrl), fmt.Sprintf("BASE_URL=%s", appCallbackUrl), fmt.Sprintf("TZ=%s", timezone), fmt.Sprintf("SHUFFLE_LOGS_DISABLED=%s", os.Getenv("SHUFFLE_LOGS_DISABLED")), } if len(actionData) >= 100000 { log.Printf("[WARNING] Omitting some data from action execution. Length: %d. Fix in SDK!", len(actionData)) newParams := []shuffle.WorkflowAppActionParameter{} for _, param := range action.Parameters { paramData, err := json.Marshal(param) if err != nil { log.Printf("[WARNING] Failed to marshal param %s: %s", param.Name, err) newParams = append(newParams, param) continue } if len(paramData) >= 50000 { log.Printf("[WARNING] Removing a lot of data from param %s with length %d", param.Name, len(paramData)) param.Value = "SHUFFLE_AUTO_REMOVED" } newParams = append(newParams, param) } action.Parameters = newParams actionData, err = json.Marshal(action) if err == nil { log.Printf("[DEBUG] Ran data replace on action %s. new length: %d", action.Name, len(actionData)) } else { log.Printf("[WARNING] Failed to marshal new actionData: %s", err) } } else { //log.Printf("[DEBUG] Actiondata is NOT 100000 in length. Adding as normal.") } actionEnv := fmt.Sprintf("ACTION=%s", string(actionData)) env = append(env, actionEnv) if strings.ToLower(os.Getenv("SHUFFLE_PASS_APP_PROXY")) == "true" { //log.Printf("APPENDING PROXY TO THE APP!") env = append(env, fmt.Sprintf("HTTP_PROXY=%s", os.Getenv("HTTP_PROXY"))) env = append(env, fmt.Sprintf("HTTPS_PROXY=%s", os.Getenv("HTTPS_PROXY"))) env = append(env, fmt.Sprintf("NO_PROXY=%s", os.Getenv("NO_PROXY"))) } // Fixes issue: // standard_go init_linux.go:185: exec user process caused "argument list too long" // https://devblogs.microsoft.com/oldnewthing/20100203-00/?p=15083 // FIXME: Ensure to NEVER do this anymore // This potentially breaks too much stuff. Better to have the app poll the data. _ = executionData /* maxSize := 32700 - len(string(actionData)) - 2000 if len(executionData) < maxSize { log.Printf("[INFO] ADDING FULL_EXECUTION because size is smaller than %d", maxSize) env = append(env, fmt.Sprintf("FULL_EXECUTION=%s", string(executionData))) } else { log.Printf("[WARNING] Skipping FULL_EXECUTION because size is larger than %d", maxSize) } */ // Uses a few ways of getting / checking if an app is available // 1. Try original with lowercase // 2. Go to original (no spaces) // 3. Add remote repo location images := []string{ image, fmt.Sprintf("%s:%s_%s", baseimagename, parsedAppname, action.AppVersion), fmt.Sprintf("%s/%s:%s_%s", registryName, baseimagename, parsedAppname, action.AppVersion), } // If cleanup is set, it should run for efficiency pullOptions := types.ImagePullOptions{} if cleanupEnv == "true" { err = deployApp(dockercli, images[0], identifier, env, workflowExecution, action) if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") { if strings.Contains(err.Error(), "exited prematurely") { log.Printf("[DEBUG] Shutting down (2)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } err := downloadDockerImageBackend(&http.Client{Timeout: 60 * time.Second}, image) executed := false if err == nil { log.Printf("[DEBUG] Downloaded image %s from backend (CLEANUP)", image) //err = deployApp(dockercli, image, identifier, env, workflow, action) err = deployApp(dockercli, image, identifier, env, workflowExecution, action) if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") { if strings.Contains(err.Error(), "exited prematurely") { log.Printf("[DEBUG] Shutting down (41)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } } else { executed = true } } if !executed { image = images[2] err = deployApp(dockercli, image, identifier, env, workflowExecution, action) if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") { if strings.Contains(err.Error(), "exited prematurely") { log.Printf("[DEBUG] Shutting down (3)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } //log.Printf("[WARNING] Failed CLEANUP execution. Downloading image %s remotely.", image) log.Printf("[WARNING] Failed to download image %s (CLEANUP): %s", image, err) reader, err := dockercli.ImagePull(context.Background(), image, pullOptions) if err != nil { log.Printf("[ERROR] Failed getting %s. Couldn't be find locally, AND is missing.", image) log.Printf("[DEBUG] Shutting down (4)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } else { baseTag := strings.Split(image, ":") if len(baseTag) > 1 { tag := baseTag[1] log.Printf("[DEBUG] Creating tag copies of registry downloaded containers from tag %s", tag) // Remapping ctx := context.Background() dockercli.ImageTag(ctx, image, fmt.Sprintf("frikky/shuffle:%s", tag)) dockercli.ImageTag(ctx, image, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag)) } } buildBuf := new(strings.Builder) _, err = io.Copy(buildBuf, reader) if err != nil && !strings.Contains(fmt.Sprintf("%s", err.Error()), "Conflict. The container name") { log.Printf("[ERROR] Error in IO copy: %s", err) log.Printf("[DEBUG] Shutting down (5)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } else { if strings.Contains(buildBuf.String(), "errorDetail") { log.Printf("[ERROR] Docker build:\n%s\nERROR ABOVE: Trying to pull tags from: %s", buildBuf.String(), image) log.Printf("[DEBUG] Shutting down (6)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } log.Printf("[INFO] Successfully downloaded %s", image) } err = deployApp(dockercli, image, identifier, env, workflowExecution, action) if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") { log.Printf("[ERROR] Failed deploying image for the FOURTH time. Aborting if the image doesn't exist") if strings.Contains(err.Error(), "exited prematurely") { log.Printf("[DEBUG] Shutting down (7)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } if strings.Contains(err.Error(), "No such image") { //log.Printf("[WARNING] Failed deploying %s from image %s: %s", identifier, image, err) log.Printf("[ERROR] Image doesn't exist. Shutting down") log.Printf("[DEBUG] Shutting down (8)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } } } } } } else { err = deployApp(dockercli, images[0], identifier, env, workflowExecution, action) if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") { log.Printf("[DEBUG] Failed deploying app? %s", err) if strings.Contains(err.Error(), "exited prematurely") { log.Printf("[DEBUG] Shutting down (9)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } // Trying to replace with lowercase to deploy again. This seems to work with Dockerhub well. // FIXME: Should try to remotely download directly if this persists. image = images[1] err = deployApp(dockercli, image, identifier, env, workflowExecution, action) if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") { if strings.Contains(err.Error(), "exited prematurely") { log.Printf("[DEBUG] Shutting down (10)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } log.Printf("[DEBUG][%s] Failed deploy. Downloading image %s: %s", workflowExecution.ExecutionId, image, err) err := downloadDockerImageBackend(&http.Client{Timeout: 60 * time.Second}, image) executed := false if err == nil { log.Printf("[DEBUG] Downloaded image %s from backend (CLEANUP)", image) //err = deployApp(dockercli, image, identifier, env, workflow, action) err = deployApp(dockercli, image, identifier, env, workflowExecution, action) if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") { if strings.Contains(err.Error(), "exited prematurely") { log.Printf("[DEBUG] Shutting down (40)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } } else { executed = true } } if !executed { image = images[2] err = deployApp(dockercli, image, identifier, env, workflowExecution, action) if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") { if strings.Contains(err.Error(), "exited prematurely") { log.Printf("[DEBUG] Shutting down (11)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } log.Printf("[WARNING] Failed deploying image THREE TIMES. Attempting to download %s as last resort from backend and dockerhub: %s", image, err) reader, err := dockercli.ImagePull(context.Background(), image, pullOptions) if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") { log.Printf("[ERROR] Failed getting %s. The couldn't be find locally, AND is missing.", image) log.Printf("[DEBUG] Shutting down (12)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } else { baseTag := strings.Split(image, ":") if len(baseTag) > 1 { tag := baseTag[1] log.Printf("[DEBUG] Creating tag copies of registry downloaded containers from tag %s", tag) // Remapping ctx := context.Background() dockercli.ImageTag(ctx, image, fmt.Sprintf("frikky/shuffle:%s", tag)) dockercli.ImageTag(ctx, image, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag)) } } buildBuf := new(strings.Builder) _, err = io.Copy(buildBuf, reader) if err != nil { log.Printf("[ERROR] Error in IO copy: %s", err) log.Printf("[DEBUG] Shutting down (13)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } else { if strings.Contains(buildBuf.String(), "errorDetail") { log.Printf("[ERROR] Docker build:\n%s\nERROR ABOVE: Trying to pull tags from: %s", buildBuf.String(), image) log.Printf("[DEBUG] Shutting down (14)") shutdown(workflowExecution, action.ID, fmt.Sprintf("Error deploying container: %s", buildBuf.String()), true) return } log.Printf("[INFO] Successfully downloaded %s", image) } } err = deployApp(dockercli, image, identifier, env, workflowExecution, action) if err != nil && !strings.Contains(err.Error(), "Conflict. The container name") { log.Printf("[ERROR] Failed deploying image for the FOURTH time. Aborting if the image doesn't exist") if strings.Contains(err.Error(), "exited prematurely") { log.Printf("[DEBUG] Shutting down (15)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } if strings.Contains(err.Error(), "No such image") { //log.Printf("[WARNING] Failed deploying %s from image %s: %s", identifier, image, err) log.Printf("[ERROR] Image doesn't exist. Shutting down") log.Printf("[DEBUG] Shutting down (16)") shutdown(workflowExecution, action.ID, fmt.Sprintf("%s", err.Error()), true) return } } } } } } //log.Printf("[INFO][%s] Adding visited (3): %s (%s). Actions: %d, Results: %d", workflowExecution.ExecutionId, action.Label, action.ID, len(workflowExecution.Workflow.Actions), len(workflowExecution.Results)) visited = append(visited, action.ID) executed = append(executed, action.ID) // If children of action.ID are NOT in executed: // Remove them from visited. //log.Printf("EXECUTED: %#v", executed) } //log.Printf(nextAction) //log.Printf(startAction, children[startAction]) // FIXME - new request here // FIXME - clean up stopped (remove) containers with this execution id err = shuffle.UpdateExecutionVariables(ctx, workflowExecution.ExecutionId, startAction, children, parents, visited, executed, nextActions, environments, extra) if err != nil { log.Printf("\n\n[ERROR] Failed to update exec variables for execution %s: %s (2)\n\n", workflowExecution.ExecutionId, err) } if len(workflowExecution.Results) == len(workflowExecution.Workflow.Actions)+extra { shutdownCheck := true for _, result := range workflowExecution.Results { if result.Status == "EXECUTING" || result.Status == "WAITING" { // Cleaning up executing stuff shutdownCheck = false // USED TO BE CONTAINER REMOVAL // FIXME - send POST request to kill the container //log.Printf("Should remove (POST request) stopped containers") //ret = requests.post("%s%s" % (self.url, stream_path), headers=headers, json=action_result) } } if shutdownCheck { log.Printf("[INFO][%s] BREAKING BECAUSE RESULTS IS SAME LENGTH AS ACTIONS. SHOULD CHECK ALL RESULTS FOR WHETHER THEY'RE DONE", workflowExecution.ExecutionId) validateFinished(workflowExecution) log.Printf("[DEBUG][%s] Shutting down (17)", workflowExecution.ExecutionId) if os.Getenv("IS_KUBERNETES") == "true" { // log.Printf("workflow execution: %#v", workflowExecution) clientset, err := getKubernetesClient() if err != nil { fmt.Println("[ERROR]Error getting kubernetes client:", err) os.Exit(1) } cleanupExecution(clientset, workflowExecution, "shuffle") } else { shutdown(workflowExecution, "", "", true) } return } } time.Sleep(time.Duration(sleepTime) * time.Second) return } func executionInit(workflowExecution shuffle.WorkflowExecution) error { parents := map[string][]string{} children := map[string][]string{} nextActions := []string{} extra := 0 startAction := workflowExecution.Start //log.Printf("[INFO][%s] STARTACTION: %s", workflowExecution.ExecutionId, startAction) if len(startAction) == 0 { log.Printf("[INFO][%s] Didn't find execution start action. Setting it to workflow start action.", workflowExecution.ExecutionId) startAction = workflowExecution.Workflow.Start } // Setting up extra counter for _, trigger := range workflowExecution.Workflow.Triggers { //log.Printf("[DEBUG] Appname trigger (0): %s", trigger.AppName) if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" { extra += 1 } } nextActions = append(nextActions, startAction) for _, branch := range workflowExecution.Workflow.Branches { // Check what the parent is first. If it's trigger - skip sourceFound := false destinationFound := false for _, action := range workflowExecution.Workflow.Actions { if action.ID == branch.SourceID { sourceFound = true } if action.ID == branch.DestinationID { destinationFound = true } } for _, trigger := range workflowExecution.Workflow.Triggers { //log.Printf("Appname trigger (0): %s", trigger.AppName) if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" { if trigger.ID == branch.SourceID { sourceFound = true } else if trigger.ID == branch.DestinationID { destinationFound = true } } } if sourceFound { parents[branch.DestinationID] = append(parents[branch.DestinationID], branch.SourceID) } else { log.Printf("[DEBUG] ID %s was not found in actions! Skipping parent. (TRIGGER?)", branch.SourceID) } if destinationFound { children[branch.SourceID] = append(children[branch.SourceID], branch.DestinationID) } else { log.Printf("[DEBUG] ID %s was not found in actions! Skipping child. (TRIGGER?)", branch.SourceID) } } /* log.Printf("\n\n\n[INFO] CHILDREN FOUND: %#v", children) log.Printf("[INFO] PARENTS FOUND: %#v", parents) log.Printf("[INFO] NEXT ACTIONS: %#v\n\n", nextActions) */ log.Printf("[INFO][%s] shuffle.Actions: %d + Special shuffle.Triggers: %d", workflowExecution.ExecutionId, len(workflowExecution.Workflow.Actions), extra) onpremApps := []string{} toExecuteOnprem := []string{} for _, action := range workflowExecution.Workflow.Actions { if strings.ToLower(action.Environment) != strings.ToLower(environment) { continue } toExecuteOnprem = append(toExecuteOnprem, action.ID) actionName := fmt.Sprintf("%s:%s_%s", baseimagename, action.AppName, action.AppVersion) found := false for _, app := range onpremApps { if actionName == app { found = true } } if !found { onpremApps = append(onpremApps, actionName) } } if len(onpremApps) == 0 { return errors.New(fmt.Sprintf("No apps to handle onprem (%s)", environment)) } pullOptions := types.ImagePullOptions{} _ = pullOptions for _, image := range onpremApps { //log.Printf("[INFO] Image: %s", image) // Kind of gambling that the image exists. if strings.Contains(image, " ") { image = strings.ReplaceAll(image, " ", "-") } // FIXME: Reimplement for speed later // Skip to make it faster //reader, err := dockercli.ImagePull(context.Background(), image, pullOptions) //if err != nil { // log.Printf("Failed getting %s. The app is missing or some other issue", image) // shutdown(workflowExecution) //} ////io.Copy(os.Stdout, reader) //_ = reader //log.Printf("Successfully downloaded and built %s", image) } ctx := context.Background() visited := []string{} executed := []string{} environments := []string{} for _, action := range workflowExecution.Workflow.Actions { found := false for _, environment := range environments { if action.Environment == environment { found = true break } } if !found { environments = append(environments, action.Environment) } } //var visited []string //var executed []string err := shuffle.UpdateExecutionVariables(ctx, workflowExecution.ExecutionId, startAction, children, parents, visited, executed, nextActions, environments, extra) if err != nil { log.Printf("\n\n[ERROR] Failed to update exec variables for execution %s: %s\n\n", workflowExecution.ExecutionId, err) } return nil } func handleDefaultExecution(client *http.Client, req *http.Request, workflowExecution shuffle.WorkflowExecution) error { // if no onprem runs (shouldn't happen, but extra check), exit // if there are some, load the images ASAP for the app ctx := context.Background() //startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId) startAction, extra, _, _, _, _, _, _ := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId) err := executionInit(workflowExecution) if err != nil { log.Printf("[INFO] Workflow setup failed for %s: %s", workflowExecution.ExecutionId, err) log.Printf("[DEBUG] Shutting down (18)") shutdown(workflowExecution, "", "", true) } log.Printf("[DEBUG] DEFAULT EXECUTION Startaction: %s", startAction) setWorkflowExecution(ctx, workflowExecution, false) streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", baseUrl) for { //fullUrl := fmt.Sprintf("%s/api/v1/workflows/%s/executions/%s/abort", baseUrl, workflowExecution.Workflow.ID, workflowExecution.ExecutionId) //log.Printf("[INFO] URL: %s", fullUrl) req, err := http.NewRequest( "POST", streamResultUrl, bytes.NewBuffer([]byte(data)), ) newresp, err := topClient.Do(req) if err != nil { log.Printf("[ERROR] Failed making request (1): %s", err) time.Sleep(time.Duration(sleepTime) * time.Second) continue } defer newresp.Body.Close() body, err := ioutil.ReadAll(newresp.Body) if err != nil { log.Printf("[ERROR] Failed reading body (1): %s", err) time.Sleep(time.Duration(sleepTime) * time.Second) continue } if newresp.StatusCode != 200 { log.Printf("[ERROR] Bad statuscode: %d, %s", newresp.StatusCode, string(body)) if strings.Contains(string(body), "Workflowexecution is already finished") { log.Printf("[DEBUG] Shutting down (19)") shutdown(workflowExecution, "", "", true) } time.Sleep(time.Duration(sleepTime) * time.Second) continue } err = json.Unmarshal(body, &workflowExecution) if err != nil { log.Printf("[ERROR] Failed workflowExecution unmarshal: %s", err) time.Sleep(time.Duration(sleepTime) * time.Second) continue } if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "SUCCESS" { log.Printf("[INFO][%s] Workflow execution is finished. Exiting worker.", workflowExecution.ExecutionId) log.Printf("[DEBUG] Shutting down (20)") //handle workerssssssssss if os.Getenv("IS_KUBERNETES") == "true" { // log.Printf("workflow execution: %#v", workflowExecution) clientset, err := getKubernetesClient() if err != nil { fmt.Println("[ERROR]Error getting kubernetes client:", err) os.Exit(1) } cleanupExecution(clientset, workflowExecution, "shuffle") } else { shutdown(workflowExecution, "", "", true) } } log.Printf("[INFO][%s] Status: %s, Results: %d, actions: %d", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results), len(workflowExecution.Workflow.Actions)+extra) if workflowExecution.Status != "EXECUTING" { log.Printf("[WARNING][%s] Exiting as worker execution has status %s!", workflowExecution.ExecutionId, workflowExecution.Status) log.Printf("[DEBUG] Shutting down (21)") if os.Getenv("IS_KUBERNETES") == "true" { // log.Printf("workflow execution: %#v", workflowExecution) clientset, err := getKubernetesClient() if err != nil { fmt.Println("[ERROR]Error getting kubernetes client:", err) os.Exit(1) } cleanupExecution(clientset, workflowExecution, "shuffle") } else { shutdown(workflowExecution, "", "", true) } } setWorkflowExecution(ctx, workflowExecution, false) //handleExecutionResult(workflowExecution) } return nil } func arrayContains(visited []string, id string) bool { found := false for _, item := range visited { if item == id { found = true break } } return found } func getResult(workflowExecution shuffle.WorkflowExecution, id string) shuffle.ActionResult { for _, actionResult := range workflowExecution.Results { if actionResult.Action.ID == id { return actionResult } } return shuffle.ActionResult{} } func getAction(workflowExecution shuffle.WorkflowExecution, id, environment string) shuffle.Action { for _, action := range workflowExecution.Workflow.Actions { if action.ID == id { return action } } for _, trigger := range workflowExecution.Workflow.Triggers { if trigger.ID == id { return shuffle.Action{ ID: trigger.ID, AppName: trigger.AppName, Name: trigger.AppName, Environment: environment, Label: trigger.Label, } log.Printf("FOUND TRIGGER: %#v!", trigger) } } return shuffle.Action{} } func runSkipAction(client *http.Client, action shuffle.Action, workflowId, workflowExecutionId, authorization string, configuration string) error { timeNow := time.Now().Unix() result := shuffle.ActionResult{ Action: action, ExecutionId: workflowExecutionId, Authorization: authorization, Result: configuration, StartedAt: timeNow, CompletedAt: 0, Status: "SUCCESS", } resultData, err := json.Marshal(result) if err != nil { return err } streamUrl := fmt.Sprintf("%s/api/v1/streams", baseUrl) req, err := http.NewRequest( "POST", streamUrl, bytes.NewBuffer([]byte(resultData)), ) if err != nil { log.Printf("[WARNING] Error building skip request (0): %s", err) return err } newresp, err := client.Do(req) if err != nil { log.Printf("[WARNING] Error running skip request (0): %s", err) return err } defer newresp.Body.Close() body, err := ioutil.ReadAll(newresp.Body) if err != nil { log.Printf("[WARNING] Failed reading body when skipping (0): %s", err) return err } log.Printf("[INFO] Skip Action Body: %s", string(body)) return nil } // Sends request back to backend to handle the node func runUserInput(client *http.Client, action shuffle.Action, workflowId string, workflowExecution shuffle.WorkflowExecution, authorization string, configuration string, dockercli *dockerclient.Client) error { timeNow := time.Now().Unix() result := shuffle.ActionResult{ Action: action, ExecutionId: workflowExecution.ExecutionId, Authorization: authorization, Result: configuration, StartedAt: timeNow, CompletedAt: 0, Status: "WAITING", } // Checking for userinput to deploy subflow for it subflow := false subflowId := "" argument := "" continueUrl := "testing continue" cancelUrl := "testing cancel" for _, item := range action.Parameters { if item.Name == "subflow" { subflow = true subflowId = item.Value } else if item.Name == "alertinfo" { argument = item.Value } } if subflow { log.Printf("[DEBUG] Should run action with subflow app with argument %#v", argument) newAction := shuffle.Action{ AppName: "shuffle-subflow", Name: "run_subflow", AppVersion: "1.0.0", Label: "User Input Subflow Execution", } identifier := fmt.Sprintf("%s_%s_%s_%s", newAction.AppName, newAction.AppVersion, action.ID, workflowExecution.ExecutionId) if strings.Contains(identifier, " ") { identifier = strings.ReplaceAll(identifier, " ", "-") } inputValue := UserInputSubflow{ Argument: argument, ContinueUrl: continueUrl, CancelUrl: cancelUrl, } parsedArgument, err := json.Marshal(inputValue) if err != nil { log.Printf("[ERROR] Failed to parse arguments: %s", err) parsedArgument = []byte(argument) } newAction.Parameters = []shuffle.WorkflowAppActionParameter{ shuffle.WorkflowAppActionParameter{ Name: "user_apikey", Value: workflowExecution.Authorization, }, shuffle.WorkflowAppActionParameter{ Name: "workflow", Value: subflowId, }, shuffle.WorkflowAppActionParameter{ Name: "argument", Value: string(parsedArgument), }, } newAction.Parameters = append(newAction.Parameters, shuffle.WorkflowAppActionParameter{ Name: "source_workflow", Value: workflowExecution.Workflow.ID, }) newAction.Parameters = append(newAction.Parameters, shuffle.WorkflowAppActionParameter{ Name: "source_execution", Value: workflowExecution.ExecutionId, }) newAction.Parameters = append(newAction.Parameters, shuffle.WorkflowAppActionParameter{ Name: "source_node", Value: action.ID, }) newAction.Parameters = append(newAction.Parameters, shuffle.WorkflowAppActionParameter{ Name: "source_auth", Value: workflowExecution.Authorization, }) newAction.Parameters = append(newAction.Parameters, shuffle.WorkflowAppActionParameter{ Name: "startnode", Value: "", }) // If cleanup is set, it should run for efficiency //appName := strings.Replace(identifier, fmt.Sprintf("_%s", action.ID), "", -1) //appName = strings.Replace(appName, fmt.Sprintf("_%s", workflowExecution.ExecutionId), "", -1) actionData, err := json.Marshal(newAction) if err != nil { return err } env := []string{ fmt.Sprintf("ACTION=%s", string(actionData)), fmt.Sprintf("EXECUTIONID=%s", workflowExecution.ExecutionId), fmt.Sprintf("AUTHORIZATION=%s", workflowExecution.Authorization), fmt.Sprintf("CALLBACK_URL=%s", baseUrl), fmt.Sprintf("BASE_URL=%s", appCallbackUrl), fmt.Sprintf("TZ=%s", timezone), fmt.Sprintf("SHUFFLE_LOGS_DISABLED=%s", os.Getenv("SHUFFLE_LOGS_DISABLED")), } if strings.ToLower(os.Getenv("SHUFFLE_PASS_APP_PROXY")) == "true" { //log.Printf("APPENDING PROXY TO THE APP!") env = append(env, fmt.Sprintf("HTTP_PROXY=%s", os.Getenv("HTTP_PROXY"))) env = append(env, fmt.Sprintf("HTTPS_PROXY=%s", os.Getenv("HTTPS_PROXY"))) env = append(env, fmt.Sprintf("NO_PROXY=%s", os.Getenv("NO_PROXY"))) } err = deployApp(dockercli, "frikky/shuffle:shuffle-subflow_1.0.0", identifier, env, workflowExecution, newAction) if err != nil { log.Printf("[ERROR] Failed to deploy subflow for user input trigger %s: %s", action.ID, err) } } else { log.Printf("[DEBUG] Running user input WITHOUT subflow") } resultData, err := json.Marshal(result) if err != nil { return err } streamUrl := fmt.Sprintf("%s/api/v1/streams", baseUrl) req, err := http.NewRequest( "POST", streamUrl, bytes.NewBuffer([]byte(resultData)), ) if err != nil { log.Printf("[WARNING] Error building test request (2): %s", err) return err } newresp, err := client.Do(req) if err != nil { log.Printf("[WARNING] Error running test request (2): %s", err) return err } defer newresp.Body.Close() body, err := ioutil.ReadAll(newresp.Body) if err != nil { log.Printf("Failed reading body when waiting: %s", err) return err } log.Printf("[INFO] User Input Body: %s", string(body)) return nil } func runTestExecution(client *http.Client, workflowId, apikey string) (string, string) { executeUrl := fmt.Sprintf("%s/api/v1/workflows/%s/execute", baseUrl, workflowId) req, err := http.NewRequest( "GET", executeUrl, nil, ) if err != nil { log.Printf("Error building test request: %s", err) return "", "" } req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", apikey)) newresp, err := client.Do(req) if err != nil { log.Printf("[WARNING] Error running test request (3): %s", err) return "", "" } defer newresp.Body.Close() body, err := ioutil.ReadAll(newresp.Body) if err != nil { log.Printf("[WARNING] Failed reading body: %s", err) return "", "" } log.Printf("[INFO] Test Body: %s", string(body)) var workflowExecution shuffle.WorkflowExecution err = json.Unmarshal(body, &workflowExecution) if err != nil { log.Printf("Failed workflowExecution unmarshal: %s", err) return "", "" } return workflowExecution.Authorization, workflowExecution.ExecutionId } func handleWorkflowQueue(resp http.ResponseWriter, request *http.Request) { if request.Body == nil { resp.WriteHeader(http.StatusBadRequest) return } body, err := ioutil.ReadAll(request.Body) if err != nil { log.Printf("[WARNING] (3) Failed reading body for workflowqueue") resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err))) return } defer request.Body.Close() var actionResult shuffle.ActionResult err = json.Unmarshal(body, &actionResult) if err != nil { log.Printf("[ERROR] Failed shuffle.ActionResult unmarshaling (2): %s", err) //resp.WriteHeader(401) //resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err))) //return } if len(actionResult.ExecutionId) == 0 { log.Printf("[WARNING] No workflow execution id in action result. Data: %s", string(body)) resp.WriteHeader(400) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "No workflow execution id in action result"}`))) return } // 1. Get the shuffle.WorkflowExecution(ExecutionId) from the database // 2. if shuffle.ActionResult.Authentication != shuffle.WorkflowExecution.Authentication -> exit // 3. Add to and update actionResult in workflowExecution // 4. Push to db // IF FAIL: Set executionstatus: abort or cancel ctx := context.Background() workflowExecution, err := shuffle.GetWorkflowExecution(ctx, actionResult.ExecutionId) if err != nil { log.Printf("[ERROR][%s] Failed getting execution (workflowqueue) %s: %s", actionResult.ExecutionId, actionResult.ExecutionId, err) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution ID %s because it doesn't exist locally."}`, actionResult.ExecutionId))) return } if workflowExecution.Authorization != actionResult.Authorization { log.Printf("[INFO] Bad authorization key when updating node (workflowQueue) %s. Want: %s, Have: %s", actionResult.ExecutionId, workflowExecution.Authorization, actionResult.Authorization) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key"}`))) return } if workflowExecution.Status == "FINISHED" { log.Printf("[DEBUG] Workflowexecution is already FINISHED. No further action can be taken") resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Workflowexecution is already finished because it has status %s. Lastnode: %s"}`, workflowExecution.Status, workflowExecution.LastNode))) return } if workflowExecution.Status == "ABORTED" || workflowExecution.Status == "FAILURE" { if workflowExecution.Workflow.Configuration.ExitOnError { log.Printf("[WARNING] Workflowexecution already has status %s. No further action can be taken", workflowExecution.Status) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Workflowexecution is aborted because of %s with result %s and status %s"}`, workflowExecution.LastNode, workflowExecution.Result, workflowExecution.Status))) return } else { log.Printf("Continuing even though it's aborted.") } } log.Printf("[INFO][%s] Got result '%s' from '%s' with app '%s':'%s'", actionResult.ExecutionId, actionResult.Status, actionResult.Action.Label, actionResult.Action.AppName, actionResult.Action.AppVersion) //results = append(results, actionResult) //log.Printf("[INFO][%s] Time to execute %s (%s) with app %s:%s, function %s, env %s with %d parameters.", workflowExecution.ExecutionId, action.ID, action.Label, action.AppName, action.AppVersion, action.Name, action.Environment, len(action.Parameters)) //log.Printf("[DEBUG][%s] In workflowQueue with transaction", workflowExecution.ExecutionId) runWorkflowExecutionTransaction(ctx, 0, workflowExecution.ExecutionId, actionResult, resp) } // Will make sure transactions are always ran for an execution. This is recursive if it fails. Allowed to fail up to 5 times func runWorkflowExecutionTransaction(ctx context.Context, attempts int64, workflowExecutionId string, actionResult shuffle.ActionResult, resp http.ResponseWriter) { //log.Printf("[DEBUG][%s] IN WORKFLOWEXECUTION SUB!", actionResult.ExecutionId) workflowExecution, err := shuffle.GetWorkflowExecution(ctx, workflowExecutionId) if err != nil { log.Printf("[ERROR] Failed getting execution cache: %s", err) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution"}`))) return } resultLength := len(workflowExecution.Results) setExecution := true workflowExecution, dbSave, err := shuffle.ParsedExecutionResult(ctx, *workflowExecution, actionResult, true, 0) if err != nil { log.Printf("[DEBUG] Rerunning transaction? %s", err) if strings.Contains(fmt.Sprintf("%s", err), "Rerun this transaction") { workflowExecution, err := shuffle.GetWorkflowExecution(ctx, workflowExecutionId) if err != nil { log.Printf("[ERROR] Failed getting execution cache (2): %s", err) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution (2)"}`))) return } resultLength = len(workflowExecution.Results) setExecution = true workflowExecution, dbSave, err = shuffle.ParsedExecutionResult(ctx, *workflowExecution, actionResult, false, 0) if err != nil { log.Printf("[ERROR] Failed execution of parsedexecution (2): %s", err) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution (2)"}`))) return } else { log.Printf("[DEBUG] Successfully got ParsedExecution with %d results!", len(workflowExecution.Results)) } } else { log.Printf("[ERROR] Failed execution of parsedexecution: %s", err) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting execution"}`))) return } } //log.Printf(`[DEBUG][%s] Got result %s from %s. Execution status: %s. Save: %#v. Parent: %#v`, actionResult.ExecutionId, actionResult.Status, actionResult.Action.ID, workflowExecution.Status, dbSave, workflowExecution.ExecutionParent) //dbSave := false //if len(results) != len(workflowExecution.Results) { // log.Printf("[DEBUG][%s] There may have been an issue in transaction queue. Result lengths: %d vs %d. Should check which exists the base results, but not in entire execution, then append.", workflowExecution.ExecutionId, len(results), len(workflowExecution.Results)) //} // Validating that action results hasn't changed // Handled using cachhing, so actually pretty fast cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId) cache, err := shuffle.GetCache(ctx, cacheKey) if err == nil { //parsedValue := value.(*shuffle.WorkflowExecution) parsedValue := &shuffle.WorkflowExecution{} cacheData := []byte(cache.([]uint8)) err = json.Unmarshal(cacheData, &workflowExecution) if err != nil { log.Printf("[ERROR] Failed unmarshalling workflowexecution: %s", err) } if len(parsedValue.Results) > 0 && len(parsedValue.Results) != resultLength { setExecution = false if attempts > 5 { //log.Printf("\n\nSkipping execution input - %d vs %d. Attempts: (%d)\n\n", len(parsedValue.Results), resultLength, attempts) } attempts += 1 if len(workflowExecution.Results) <= len(workflowExecution.Workflow.Actions) { runWorkflowExecutionTransaction(ctx, attempts, workflowExecutionId, actionResult, resp) return } } } /* if value, found := requestCache.Get(cacheKey); found { parsedValue := value.(*shuffle.WorkflowExecution) if len(parsedValue.Results) > 0 && len(parsedValue.Results) != resultLength { setExecution = false if attempts > 5 { //log.Printf("\n\nSkipping execution input - %d vs %d. Attempts: (%d)\n\n", len(parsedValue.Results), resultLength, attempts) } attempts += 1 if len(workflowExecution.Results) <= len(workflowExecution.Workflow.Actions) { runWorkflowExecutionTransaction(ctx, attempts, workflowExecutionId, actionResult, resp) return } } } */ if setExecution || workflowExecution.Status == "FINISHED" || workflowExecution.Status == "ABORTED" || workflowExecution.Status == "FAILURE" { log.Printf("[DEBUG][%s] Running setexec with status %s and %d results", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results)) err = setWorkflowExecution(ctx, *workflowExecution, dbSave) if err != nil { resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed setting workflowexecution actionresult: %s"}`, err))) return } } else { log.Printf("[INFO][%s] Skipping setexec with status %s", workflowExecution.ExecutionId, workflowExecution.Status) // Just in case. Should MAYBE validate finishing another time as well. // This fixes issues with e.g. shuffle.Action -> shuffle.Trigger -> shuffle.Action. handleExecutionResult(*workflowExecution) //validateFinished(workflowExecution) } //if newExecutions && len(nextActions) > 0 { // log.Printf("[DEBUG][%s] New execution: %#v. NextActions: %#v", newExecutions, nextActions) // //handleExecutionResult(*workflowExecution) //} resp.WriteHeader(200) resp.Write([]byte(fmt.Sprintf(`{"success": true}`))) } func sendSelfRequest(actionResult shuffle.ActionResult) { log.Printf("[INFO][%s] Not sending backend info since source is default (not swarm)", actionResult.ExecutionId) return data, err := json.Marshal(actionResult) if err != nil { log.Printf("[ERROR][%s] Shutting down (24): Failed to unmarshal data for backend: %s", actionResult.ExecutionId, err) return } if actionResult.ExecutionId == "TBD" { return } log.Printf("[DEBUG][%s] Sending FAILURE to self to stop the workflow execution. Action: %s (%s), app %s:%s", actionResult.ExecutionId, actionResult.Action.Label, actionResult.Action.ID, actionResult.Action.AppName, actionResult.Action.AppVersion) // Literally sending to same worker to run it as a new request streamUrl := fmt.Sprintf("http://localhost:33333/api/v1/streams") hostenv := os.Getenv("WORKER_HOSTNAME") if len(hostenv) > 0 { streamUrl = fmt.Sprintf("http://%s:33333/api/v1/streams", hostenv) } req, err := http.NewRequest( "POST", streamUrl, bytes.NewBuffer([]byte(data)), ) if err != nil { log.Printf("[ERROR][%s] Failed creating self request (1): %s", actionResult.ExecutionId, err) return } newresp, err := topClient.Do(req) if err != nil { log.Printf("[ERROR][%s] Error running self request (2): %s", actionResult.ExecutionId, err) return } defer newresp.Body.Close() if newresp.Body != nil { body, err := ioutil.ReadAll(newresp.Body) //log.Printf("[INFO] BACKEND STATUS: %d", newresp.StatusCode) if err != nil { log.Printf("[ERROR][%s] Failed reading self request body: %s", actionResult.ExecutionId, err) } else { log.Printf("[DEBUG][%s] NEWRESP (from self - 1): %s", actionResult.ExecutionId, string(body)) } } } func sendResult(workflowExecution shuffle.WorkflowExecution, data []byte) { if workflowExecution.ExecutionSource == "default" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm" { //log.Printf("[INFO][%s] Not sending backend info since source is default (not swarm)", workflowExecution.ExecutionId) //return } streamUrl := fmt.Sprintf("%s/api/v1/streams", baseUrl) req, err := http.NewRequest( "POST", streamUrl, bytes.NewBuffer([]byte(data)), ) if err != nil { log.Printf("[ERROR][%s] Failed creating finishing request: %s", workflowExecution.ExecutionId, err) log.Printf("[DEBUG][%s] Shutting down (22)", workflowExecution.ExecutionId) shutdown(workflowExecution, "", "", false) return } newresp, err := topClient.Do(req) if err != nil { log.Printf("[ERROR][%s] Error running finishing request: %s", workflowExecution.ExecutionId, err) log.Printf("[DEBUG][%s] Shutting down (23)", workflowExecution.ExecutionId) shutdown(workflowExecution, "", "", false) return } defer newresp.Body.Close() if newresp.Body != nil { body, err := ioutil.ReadAll(newresp.Body) //log.Printf("[INFO] BACKEND STATUS: %d", newresp.StatusCode) if err != nil { log.Printf("[ERROR][%s] Failed reading body: %s", workflowExecution.ExecutionId, err) } else { log.Printf("[DEBUG][%s] NEWRESP (from backend): %s", workflowExecution.ExecutionId, string(body)) } } } func validateFinished(workflowExecution shuffle.WorkflowExecution) bool { ctx := context.Background() newexec, err := shuffle.GetWorkflowExecution(ctx, workflowExecution.ExecutionId) if err != nil { log.Printf("[ERROR][%s] Failed getting workflow execution: %s", workflowExecution.ExecutionId, err) } else { workflowExecution = *newexec } //startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId) workflowExecution = shuffle.Fixexecution(ctx, workflowExecution) _, extra, _, _, _, _, _, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId) log.Printf("[INFO][%s] VALIDATION. Status: %s, shuffle.Actions: %d, Extra: %d, Results: %d. Parent: %#v\n", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Workflow.Actions), extra, len(workflowExecution.Results), workflowExecution.ExecutionParent) //if len(workflowExecution.Results) == len(workflowExecution.Workflow.Actions)+extra { if (len(environments) == 1 && requestsSent == 0 && len(workflowExecution.Results) >= 1 && os.Getenv("SHUFFLE_SWARM_CONFIG") != "run" && os.Getenv("SHUFFLE_SWARM_CONFIG") != "swarm") || (len(workflowExecution.Results) >= len(workflowExecution.Workflow.Actions)+extra && len(workflowExecution.Workflow.Actions) > 0) { if workflowExecution.Status == "FINISHED" { for _, result := range workflowExecution.Results { if result.Status == "EXECUTING" || result.Status == "WAITING" { log.Printf("[WARNING] NOT returning full result, as a result may be unfinished: %s (%s) - %s", result.Action.Label, result.Action.ID, result.Status) return false } } } requestsSent += 1 log.Printf("[DEBUG][%s] Should send full result to %s", workflowExecution.ExecutionId, baseUrl) //data = fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, executionId, authorization) shutdownData, err := json.Marshal(workflowExecution) if err != nil { log.Printf("[ERROR][%s] Shutting down (32): Failed to unmarshal data for backend: %s", workflowExecution.ExecutionId, err) shutdown(workflowExecution, "", "", true) } cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId) err = shuffle.SetCache(ctx, cacheKey, shutdownData, 30) if err != nil { log.Printf("[ERROR][%s] Failed adding to cache during validateFinished", workflowExecution) } shuffle.RunCacheCleanup(ctx, workflowExecution) sendResult(workflowExecution, shutdownData) return true } return false } func handleGetStreamResults(resp http.ResponseWriter, request *http.Request) { body, err := ioutil.ReadAll(request.Body) if err != nil { log.Printf("[WARNING] Failed reading body for stream result queue") resp.WriteHeader(500) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err))) return } defer request.Body.Close() //log.Printf("[DEBUG] In get stream results with body length %d: %s", len(body), string(body)) var actionResult shuffle.ActionResult err = json.Unmarshal(body, &actionResult) if err != nil { log.Printf("[WARNING] Failed shuffle.ActionResult unmarshaling: %s", err) //resp.WriteHeader(400) //resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err))) //return } if len(actionResult.ExecutionId) == 0 { log.Printf("[WARNING] No workflow execution id in action result (2). Data: %s", string(body)) resp.WriteHeader(400) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "No workflow execution id in action result"}`))) return } ctx := context.Background() workflowExecution, err := shuffle.GetWorkflowExecution(ctx, actionResult.ExecutionId) if err != nil { log.Printf("[INFO] Failed getting execution (streamresult) %s: %s", actionResult.ExecutionId, err) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key or execution_id might not exist."}`))) return } // Authorization is done here if workflowExecution.Authorization != actionResult.Authorization { log.Printf("Bad authorization key when getting stream results %s.", actionResult.ExecutionId) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad authorization key or execution_id might not exist."}`))) return } newjson, err := json.Marshal(workflowExecution) if err != nil { resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed unpacking workflow execution"}`))) return } resp.WriteHeader(200) resp.Write(newjson) } func setWorkflowExecution(ctx context.Context, workflowExecution shuffle.WorkflowExecution, dbSave bool) error { if len(workflowExecution.ExecutionId) == 0 { log.Printf("[DEBUG] Workflowexecution executionId can't be empty.") return errors.New("ExecutionId can't be empty.") } //log.Printf("[DEBUG][%s] Setting with %d results (pre)", workflowExecution.ExecutionId, len(workflowExecution.Results)) workflowExecution = shuffle.Fixexecution(ctx, workflowExecution) //log.Printf("[DEBUG][%s] Setting with %d results (post)", workflowExecution.ExecutionId, len(workflowExecution.Results)) cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId) execData, err := json.Marshal(workflowExecution) if err != nil { log.Printf("[ERROR] Failed marshalling execution during set: %s", err) return err } err = shuffle.SetCache(ctx, cacheKey, execData, 30) if err != nil { log.Printf("[ERROR][%s] Failed adding to cache during setexecution", workflowExecution) return err } //requestCache.Set(cacheKey, &workflowExecution, cache.DefaultExpiration) handleExecutionResult(workflowExecution) validateFinished(workflowExecution) // FIXME: Should this shutdown OR send the result? // The worker may not be running the backend hmm if dbSave { if workflowExecution.ExecutionSource == "default" { log.Printf("[DEBUG][%s] Shutting down (25)", workflowExecution.ExecutionId) shutdown(workflowExecution, "", "", true) //return } else { log.Printf("[DEBUG] NOT shutting down with dbSave (%s)", workflowExecution.ExecutionSource) } } return nil } // GetLocalIP returns the non loopback local IP of the host func getLocalIP() string { addrs, err := net.InterfaceAddrs() if err != nil { return "" } for _, address := range addrs { // check the address type and if it is not a loopback the display it if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { if ipnet.IP.To4() != nil { return ipnet.IP.String() } } } return "" } func getAvailablePort() (net.Listener, error) { listener, err := net.Listen("tcp", ":0") if err != nil { log.Printf("[WARNING] Failed to assign port by default. Defaulting to 5001") //return ":5001" return nil, err } //defer listener.Close() return listener, nil //return fmt.Sprintf(":%d", port) } func webserverSetup(workflowExecution shuffle.WorkflowExecution) net.Listener { hostname = getLocalIP() os.Setenv("WORKER_HOSTNAME", hostname) // FIXME: This MAY not work because of speed between first // container being launched and port being assigned to webserver listener, err := getAvailablePort() if err != nil { log.Printf("[ERROR] Failed to create init listener: %s", err) return listener } log.Printf("[DEBUG] OLD HOSTNAME: %s", appCallbackUrl) port := listener.Addr().(*net.TCPAddr).Port log.Printf("\n\n[DEBUG] Starting webserver (2) on port %d with hostname: %s\n\n", port, hostname) appCallbackUrl = fmt.Sprintf("http://%s:%d", hostname, port) log.Printf("[INFO] NEW WORKER HOSTNAME: %s", appCallbackUrl) return listener } func downloadDockerImageBackend(client *http.Client, imageName string) error { log.Printf("[DEBUG] Trying to download image %s from backend %s as it doesn't exist. All images: %#v", imageName, baseUrl, downloadedImages) if arrayContains(downloadedImages, imageName) { log.Printf("[DEBUG] Image %s already downloaded", imageName) return nil } downloadedImages = append(downloadedImages, imageName) data := fmt.Sprintf(`{"name": "%s"}`, imageName) dockerImgUrl := fmt.Sprintf("%s/api/v1/get_docker_image", baseUrl) req, err := http.NewRequest( "POST", dockerImgUrl, bytes.NewBuffer([]byte(data)), ) authorization := os.Getenv("AUTHORIZATION") if len(authorization) > 0 { req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", authorization)) } else { log.Printf("[WARNING] No auth found - running backend download without it.") //return } newresp, err := client.Do(req) if err != nil { log.Printf("[ERROR] Failed download request for %s: %s", imageName, err) return err } defer newresp.Body.Close() if newresp.StatusCode != 200 { log.Printf("[ERROR] Docker download for image %s (backend) StatusCode (1): %d", imageName, newresp.StatusCode) return errors.New(fmt.Sprintf("Failed to get image - status code %d", newresp.StatusCode)) } newImageName := strings.Replace(imageName, "/", "_", -1) newFileName := newImageName + ".tar" tar, err := os.Create(newFileName) if err != nil { log.Printf("[WARNING] Failed creating file: %s", err) return err } defer tar.Close() _, err = io.Copy(tar, newresp.Body) if err != nil { log.Printf("[WARNING] Failed response body copying: %s", err) return err } tar.Seek(0, 0) dockercli, err := dockerclient.NewEnvClient() if err != nil { log.Printf("[ERROR] Unable to create docker client (3): %s", err) return err } imageLoadResponse, err := dockercli.ImageLoad(context.Background(), tar, true) if err != nil { log.Printf("[ERROR] Error loading images: %s", err) return err } body, err := ioutil.ReadAll(imageLoadResponse.Body) if err != nil { log.Printf("[ERROR] Error reading: %s", err) return err } if strings.Contains(string(body), "no such file") { return errors.New(string(body)) } baseTag := strings.Split(imageName, ":") if len(baseTag) > 1 { tag := baseTag[1] log.Printf("[DEBUG] Creating tag copies of downloaded containers from tag %s", tag) // Remapping ctx := context.Background() dockercli.ImageTag(ctx, imageName, fmt.Sprintf("frikky/shuffle:%s", tag)) dockercli.ImageTag(ctx, imageName, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag)) downloadedImages = append(downloadedImages, fmt.Sprintf("frikky/shuffle:%s", tag)) downloadedImages = append(downloadedImages, fmt.Sprintf("registry.hub.docker.com/frikky/shuffle:%s", tag)) } os.Remove(newFileName) log.Printf("[INFO] Successfully loaded image %s: %s", imageName, string(body)) return nil } // Initial loop etc func main() { // Elasticsearch necessary to ensure we'ren ot running with Datastore configurations for minimal/maximal data sizes _, err := shuffle.RunInit(datastore.Client{}, storage.Client{}, "", "worker", true, "elasticsearch") if err != nil { log.Printf("[ERROR] Failed to run worker init: %s", err) } else { log.Printf("[DEBUG] Ran init for worker to set up cache system. Docker version: %s", dockerApiVersion) } log.Printf("[INFO] Setting up worker environment") sleepTime := 5 client := shuffle.GetExternalClient(baseUrl) if timezone == "" { timezone = "Europe/Amsterdam" } log.Printf("[INFO] Running with timezone %s and swarm config %#v", timezone, os.Getenv("SHUFFLE_SWARM_CONFIG")) authorization := "" executionId := "" // INFO: Allows you to run a test execution testing := os.Getenv("WORKER_TESTING_WORKFLOW") shuffle_apikey := os.Getenv("WORKER_TESTING_APIKEY") if len(testing) > 0 && len(shuffle_apikey) > 0 { // Execute a workflow and use that info log.Printf("[WARNING] Running test environment for worker by executing workflow %s. PS: This may NOT reach the worker in real time, but rather be deployed as a docker container (bad). Instead use AUTHORIZATION and EXECUTIONID for direct testing", testing) authorization, executionId = runTestExecution(client, testing, shuffle_apikey) } else { authorization = os.Getenv("AUTHORIZATION") executionId = os.Getenv("EXECUTIONID") log.Printf("[INFO] Running normal execution with auth %s and ID %s", authorization, executionId) } workflowExecution := shuffle.WorkflowExecution{ ExecutionId: executionId, } if len(authorization) == 0 { log.Printf("[INFO] No AUTHORIZATION key set in env") log.Printf("[DEBUG] Shutting down (27)") shutdown(workflowExecution, "", "", false) } if len(executionId) == 0 { log.Printf("[INFO] No EXECUTIONID key set in env") log.Printf("[DEBUG] Shutting down (28)") shutdown(workflowExecution, "", "", false) } data = fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, executionId, authorization) streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", baseUrl) req, err := http.NewRequest( "POST", streamResultUrl, bytes.NewBuffer([]byte(data)), ) if err != nil { log.Printf("[ERROR] Failed making request builder for backend") log.Printf("[DEBUG] Shutting down (29)") shutdown(workflowExecution, "", "", true) } topClient = client firstRequest := true environments := []string{} for { // Because of this, it always has updated data. // Removed request requirement from app_sdk newresp, err := client.Do(req) if err != nil { log.Printf("[ERROR] Failed request: %s", err) time.Sleep(time.Duration(sleepTime) * time.Second) continue } defer newresp.Body.Close() body, err := ioutil.ReadAll(newresp.Body) if err != nil { log.Printf("[ERROR] Failed reading body: %s", err) time.Sleep(time.Duration(sleepTime) * time.Second) continue } if newresp.StatusCode != 200 { log.Printf("[ERROR] %s\nStatusCode (1): %d", string(body), newresp.StatusCode) time.Sleep(time.Duration(sleepTime) * time.Second) continue } err = json.Unmarshal(body, &workflowExecution) if err != nil { log.Printf("[ERROR] Failed workflowExecution unmarshal: %s", err) time.Sleep(time.Duration(sleepTime) * time.Second) continue } if firstRequest { firstRequest = false //workflowExecution.StartedAt = int64(time.Now().Unix()) ctx := context.Background() cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId) execData, err := json.Marshal(workflowExecution) if err != nil { log.Printf("[ERROR][%s] Failed marshalling execution during set (3): %s", workflowExecution.ExecutionId, err) } else { err = shuffle.SetCache(ctx, cacheKey, execData, 30) if err != nil { log.Printf("[ERROR][%s] Failed adding to cache during setexecution (3): %s", workflowExecution.ExecutionId, err) } } //requestCache = cache.New(60*time.Minute, 120*time.Minute) //requestCache.Set(cacheKey, &workflowExecution, cache.DefaultExpiration) for _, action := range workflowExecution.Workflow.Actions { found := false for _, environment := range environments { if action.Environment == environment { found = true break } } if !found { environments = append(environments, action.Environment) } } // Checks if a subflow is child of the startnode, as sub-subflows aren't working properly yet childNodes := shuffle.FindChildNodes(workflowExecution, workflowExecution.Start, []string{}, []string{}) log.Printf("[DEBUG] Looking for subflow in %#v to check execution pattern as child of %s", childNodes, workflowExecution.Start) subflowFound := false for _, childNode := range childNodes { for _, trigger := range workflowExecution.Workflow.Triggers { if trigger.ID != childNode { continue } if trigger.AppName == "Shuffle Workflow" { subflowFound = true break } } if subflowFound { break } } log.Printf("\n\nEnvironments: %s. Source: %s. 1 env = webserver, 0 or >1 = default. Subflow exists: %#v\n\n", environments, workflowExecution.ExecutionSource, subflowFound) if len(environments) == 1 && workflowExecution.ExecutionSource != "default" && !subflowFound { log.Printf("\n\n[DEBUG] Running OPTIMIZED execution (not manual)\n\n") listener := webserverSetup(workflowExecution) err := executionInit(workflowExecution) if err != nil { log.Printf("[DEBUG] Workflow setup failed: %s", workflowExecution.ExecutionId, err) log.Printf("[DEBUG] Shutting down (30)") shutdown(workflowExecution, "", "", true) } go func() { time.Sleep(time.Duration(1)) handleExecutionResult(workflowExecution) }() runWebserver(listener) //log.Printf("Before wait") //wg := sync.WaitGroup{} //wg.Add(1) //wg.Wait() } else { log.Printf("\n\n[DEBUG] Running NON-OPTIMIZED execution for type %s with %d environment(s). This only happens when ran manually OR when running with subflows. Status: %s\n\n", workflowExecution.ExecutionSource, len(environments), workflowExecution.Status) err := executionInit(workflowExecution) if err != nil { log.Printf("[DEBUG] Workflow setup failed: %s", workflowExecution.ExecutionId, err) shutdown(workflowExecution, "", "", true) } // Trying to make worker into microservice~ :) } } if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "SUCCESS" { log.Printf("[DEBUG] Workflow %s is finished. Exiting worker.", workflowExecution.ExecutionId) log.Printf("[DEBUG] Shutting down (31)") shutdown(workflowExecution, "", "", true) } if workflowExecution.Status == "EXECUTING" || workflowExecution.Status == "RUNNING" { //log.Printf("Status: %s", workflowExecution.Status) err = handleDefaultExecution(client, req, workflowExecution) if err != nil { log.Printf("[DEBUG] Workflow %s is finished: %s", workflowExecution.ExecutionId, err) log.Printf("[DEBUG] Shutting down (32)") shutdown(workflowExecution, "", "", true) } } else { log.Printf("[DEBUG] Workflow %s has status %s. Exiting worker.", workflowExecution.ExecutionId, workflowExecution.Status) log.Printf("[DEBUG] Shutting down (33)") shutdown(workflowExecution, workflowExecution.Workflow.ID, "", true) } time.Sleep(time.Duration(sleepTime) * time.Second) } } func checkUnfinished(resp http.ResponseWriter, request *http.Request, execRequest shuffle.OrborusExecutionRequest) { // Meant as a function that periodically checks whether previous executions have finished or not. // Should probably be based on executedIds and finishedIds // Schedule a check in the future instead? ctx := context.Background() exec, err := shuffle.GetWorkflowExecution(ctx, execRequest.ExecutionId) log.Printf("[DEBUG][%s] Rechecking execution and it's status to send to backend IF the status is EXECUTING (%s - %d/%d finished)", execRequest.ExecutionId, exec.Status, len(exec.Results), len(exec.Workflow.Actions)) if err != nil { return } // FIXMe: Does this create issue with infinite loops? // Usually caused by issue during startup if exec.Status == "" { //handleRunExecution(resp, request) return } if exec.Status != "EXECUTING" { return } log.Printf("[DEBUG][%s] Should send full result for execution to backend as it has %d results. Status: %s", execRequest.ExecutionId, len(exec.Results), exec.Status) data, err := json.Marshal(exec) if err != nil { return } sendResult(*exec, data) } func handleRunExecution(resp http.ResponseWriter, request *http.Request) { body, err := ioutil.ReadAll(request.Body) if err != nil { log.Printf("[WARNING] Failed reading body for stream result queue") resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err))) return } defer request.Body.Close() //log.Printf("[DEBUG] In run execution with body length %d", len(body)) var execRequest shuffle.OrborusExecutionRequest err = json.Unmarshal(body, &execRequest) if err != nil { log.Printf("[WARNING] Failed shuffle.WorkflowExecution unmarshaling: %s", err) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err))) return } // Checks if a workflow is done 30 seconds later, and sends info to backend no matter what go func() { time.Sleep(time.Duration(30) * time.Second) checkUnfinished(resp, request, execRequest) }() // FIXME: This should be PER EXECUTION //if strings.ToLower(os.Getenv("SHUFFLE_PASS_APP_PROXY")) == "true" { // Is it ok if these are standard? Should they be update-able after launch? Hmm if len(execRequest.HTTPProxy) > 0 { log.Printf("[DEBUG] Sending proxy info to child process") os.Setenv("SHUFFLE_PASS_APP_PROXY", execRequest.ShufflePassProxyToApp) } if len(execRequest.HTTPProxy) > 0 { log.Printf("[DEBUG] Running with default HTTP proxy %s", execRequest.HTTPProxy) os.Setenv("HTTP_PROXY", execRequest.HTTPProxy) } if len(execRequest.HTTPSProxy) > 0 { log.Printf("[DEBUG] Running with default HTTPS proxy %s", execRequest.HTTPSProxy) os.Setenv("HTTPS_PROXY", execRequest.HTTPSProxy) } if len(execRequest.EnvironmentName) > 0 { os.Setenv("ENVIRONMENT_NAME", execRequest.EnvironmentName) environment = execRequest.EnvironmentName } if len(execRequest.Timezone) > 0 { os.Setenv("TZ", execRequest.Timezone) timezone = execRequest.Timezone } if len(execRequest.Cleanup) > 0 { os.Setenv("CLEANUP", execRequest.Cleanup) cleanupEnv = execRequest.Cleanup } if len(execRequest.BaseUrl) > 0 { os.Setenv("BASE_URL", execRequest.BaseUrl) baseUrl = execRequest.BaseUrl } // Setting to just have an auth available. if len(execRequest.Authorization) > 0 && len(os.Getenv("AUTHORIZATION")) == 0 { //log.Printf("[DEBUG] Sending proxy info to child process") os.Setenv("AUTHORIZATION", execRequest.Authorization) } topClient = &http.Client{} var workflowExecution shuffle.WorkflowExecution data = fmt.Sprintf(`{"execution_id": "%s", "authorization": "%s"}`, execRequest.ExecutionId, execRequest.Authorization) streamResultUrl := fmt.Sprintf("%s/api/v1/streams/results", baseUrl) req, err := http.NewRequest( "POST", streamResultUrl, bytes.NewBuffer([]byte(data)), ) newresp, err := topClient.Do(req) if err != nil { log.Printf("[ERROR] Failed making request (2): %s", err) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err))) return } body, err = ioutil.ReadAll(newresp.Body) if err != nil { log.Printf("[ERROR] Failed reading body (2): %s", err) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err))) return } if newresp.StatusCode != 200 { log.Printf("[ERROR] Bad statuscode: %d, %s", newresp.StatusCode, string(body)) if strings.Contains(string(body), "Workflowexecution is already finished") { log.Printf("[DEBUG] Shutting down (19)") //shutdown(workflowExecution, "", "", true) } resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad statuscode: %d"}`, newresp.StatusCode))) return } err = json.Unmarshal(body, &workflowExecution) if err != nil { log.Printf("[ERROR] Failed workflowExecution unmarshal: %s", err) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "%s"}`, err))) return } ctx := context.Background() err = setWorkflowExecution(ctx, workflowExecution, true) if err != nil { log.Printf("[ERROR] Failed initializing execution saving for %s: %s", workflowExecution.ExecutionId, err) } if workflowExecution.Status == "FINISHED" || workflowExecution.Status == "SUCCESS" { log.Printf("[DEBUG] Workflow %s is finished. Exiting worker.", workflowExecution.ExecutionId) log.Printf("[DEBUG] Shutting down (20)") resp.WriteHeader(200) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad status for execution - already %s. Returning with 200 OK"}`, workflowExecution.Status))) return } //startAction, extra, children, parents, visited, executed, nextActions, environments := shuffle.GetExecutionVariables(ctx, workflowExecution.ExecutionId) extra := 0 for _, trigger := range workflowExecution.Workflow.Triggers { //log.Printf("Appname trigger (0): %s", trigger.AppName) if trigger.AppName == "User Input" || trigger.AppName == "Shuffle Workflow" { extra += 1 } } log.Printf("[INFO][%s] Status: %s, Results: %d, actions: %d", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Results), len(workflowExecution.Workflow.Actions)+extra) if workflowExecution.Status != "EXECUTING" { log.Printf("[WARNING] Exiting as worker execution has status %s!", workflowExecution.Status) log.Printf("[DEBUG] Shutting down (21)") resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Bad status %s for the workflow execution %s"}`, workflowExecution.Status, workflowExecution.ExecutionId))) return } //log.Printf("[DEBUG] Starting execution :O") cacheKey := fmt.Sprintf("workflowexecution_%s", workflowExecution.ExecutionId) execData, err := json.Marshal(workflowExecution) if err != nil { log.Printf("[ERROR][%s] Failed marshalling execution during set (3): %s", workflowExecution.ExecutionId, err) } else { err = shuffle.SetCache(ctx, cacheKey, execData, 30) if err != nil { log.Printf("[ERROR][%s] Failed adding to cache during setexecution (3): %s", workflowExecution.ExecutionId, err) } } //requestCache.Set(cacheKey, &workflowExecution, cache.DefaultExpiration) err = executionInit(workflowExecution) if err != nil { log.Printf("[DEBUG][%s] Shutting down (30) - Workflow setup failed: %s", workflowExecution.ExecutionId, workflowExecution.ExecutionId, err) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Error in execution init: %s"}`, err))) return //shutdown(workflowExecution, "", "", true) } //go handleExecutionResult(workflowExecution) handleExecutionResult(workflowExecution) resp.WriteHeader(200) resp.Write([]byte(fmt.Sprintf(`{"success": true}`))) } func runWebserver(listener net.Listener) { r := mux.NewRouter() r.HandleFunc("/api/v1/streams", handleWorkflowQueue).Methods("POST", "OPTIONS") r.HandleFunc("/api/v1/streams/results", handleGetStreamResults).Methods("POST", "OPTIONS") //log.Fatal(http.ListenAndServe(port, nil)) //srv := http.Server{ // Addr: ":8888", // WriteTimeout: 1 * time.Second, // Handler: http.HandlerFunc(slowHandler), //} //log.Fatal(http.Serve(listener, nil)) log.Printf("\n\n[DEBUG] NEW webserver setup\n\n") http.Handle("/", r) srv := http.Server{ Handler: r, ReadTimeout: 60 * time.Second, ReadHeaderTimeout: 60 * time.Second, IdleTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second, } err := srv.Serve(listener) if err != nil { log.Printf("serveIssue: %#v", err) } log.Printf("[DEBUG] Do we see this?") }