Example

JS Task API Examples: composing tasks

Introduction

Task Executor methods take a task function as a parameter for each of its methods. This function is asynchronous and provides access to the WorkContext object, which is provided as one of its parameters.

A task function may be very simple, consisting of a single command, or it may consist of a set of steps that include running commands or sending data to and from providers.

Commands can be run in sequence or can be chained in batches. Depending on how you define your batch, you can obtain results of different types.

The following commands are currently available:

CommandAvailable in node.jsAvailable in web browser
run()yesyes
runAndStream()yesyes
uploadFile()yesno
uploadJson()yesyes
downloadFile()yesno
uploadData()yesyes
downloadData()noyes
downloadJson()noyes
info

This article focuses on the run(), runAndStream() commands and chaining commands using the beginBatch() method. Examples for the uploadFile(), uploadJSON(), downloadFile() commands can be found in the Sending Data article.

We'll start with a simple example featuring a single run() command. Then, we'll focus on organizing a more complex task that requires a series of steps:

  • send a worker.js script to the provider (this is a simple script that prints "Good morning Golem!" in the terminal),
  • run the worker.js on a provider and save the output to a file (output.txt) and finally
  • download the output.txt file back to your computer.

Prerequisites

Yagna service is installed and running with the try_golem app-key configured.

How to run examples

Create a project folder, initialize a Node.js project, and install the @golem-sdk/task-executor library.

mkdir golem-example
cd golem-example
npm init
npm i @golem-sdk/task-executor

Copy the code into the index.mjs file in the project folder and run:

node index.mjs

Some of the examples require a simple worker.mjs script that can be created with the following command:

echo console.log("Hello Golem World!"); > worker.mjs

Running a single command

Below is an example of a simple script that remotely executes node -v.

import { TaskExecutor, pinoPrettyLogger } from "@golem-sdk/task-executor";

(async () => {
  const executor = await TaskExecutor.create({
    package: "golem/node:20-alpine",
    logger: pinoPrettyLogger(),
    yagnaOptions: { apiKey: "try_golem" },
  });

  try {
    const result = await executor.run(async (ctx) => (await ctx.run("node -v")).stdout);
    console.log("Task result:", result);
  } catch (err) {
    console.error("An error occurred:", err);
  } finally {
    await executor.shutdown();
  }
})();

Note that ctx.run() accepts a string as an argument. This string is a command invocation, executed exactly as one would do in the console. The command will be run in the folder defined by the WORKDIR entry in your image definition.

Running multiple commands (prosaic way)

Your task function can consist of multiple steps, all run on the ctx context.

import { TaskExecutor, pinoPrettyLogger } from "@golem-sdk/task-executor";

(async () => {
  const executor = await TaskExecutor.create({
    package: "golem/node:20-alpine",
    logger: pinoPrettyLogger(),
    yagnaOptions: { apiKey: "try_golem" },
  });

  try {
    const result = await executor.run(async (ctx) => {
      await ctx.uploadFile("./worker.mjs", "/golem/input/worker.mjs");
      await ctx.run("node /golem/input/worker.mjs > /golem/input/output.txt");
      const result = await ctx.run("cat /golem/input/output.txt");
      await ctx.downloadFile("/golem/input/output.txt", "./output.txt");
      return result.stdout;
    });

    console.log(result);
  } catch (err) {
    console.error("An error occurred:", err);
  } finally {
    await executor.shutdown();
  }
})();

To ensure the proper sequence of execution, all calls must be awaited. We only handle the result of the second run() and ignore the others.

info

If you use this approach, each command is sent separately to the provider and then executed.

Multiple Commands output log

Organizing commands into batches

Now, let's take a look at how you can arrange multiple commands into batches. Depending on how you finalize your batch, you will obtain either:

  • an array of result objects or
  • ReadableStream

Organizing commands into a batch resulting in an array of Promise results

Use the beginBatch() method and chain commands followed by .end().

import { TaskExecutor, pinoPrettyLogger } from "@golem-sdk/task-executor";

(async () => {
  const executor = await TaskExecutor.create({
    package: "golem/node:20-alpine",
    logger: pinoPrettyLogger(),
    yagnaOptions: { apiKey: "try_golem" },
  });

  try {
    const result = await executor.run(async (ctx) => {
      return (
        await ctx
          .beginBatch()
          .uploadFile("./worker.mjs", "/golem/input/worker.mjs")
          .run("node /golem/input/worker.mjs > /golem/input/output.txt")
          .run("cat /golem/input/output.txt")
          .downloadFile("/golem/input/output.txt", "./output.txt")
          .end()
      )[2]?.stdout;
    });

    console.log(result);
  } catch (error) {
    console.error("Computation failed:", error);
  } finally {
    await executor.shutdown();
  }
})();

All commands after .beginBatch() are run in a sequence. The chain is terminated with .end(). The output is a Promise of an array of result objects. They are stored at indices according to their position in the command chain (the first command after beginBatch() has an index of 0).

The output of the 3rd command, run('cat /golem/input/output.txt'), is under the index of 2.

Commands batch end output logs

Organizing commands into a batch producing a Readable stream

To produce a Readable Stream, use the beginBatch() method and chain commands, followed by endStream().

import { TaskExecutor, pinoPrettyLogger } from "@golem-sdk/task-executor";

(async () => {
  const executor = await TaskExecutor.create({
    package: "golem/node:20-alpine",
    logger: pinoPrettyLogger(),
    yagnaOptions: { apiKey: "try_golem" },
  });

  try {
    const result = await executor.run(async (ctx) => {
      const res = await ctx
        .beginBatch()
        .uploadFile("./worker.mjs", "/golem/input/worker.mjs")
        .run("node /golem/input/worker.mjs > /golem/input/output.txt")
        .run("cat /golem/input/output.txt")
        .downloadFile("/golem/input/output.txt", "./output.txt")
        .endStream();

      return new Promise((resolve) => {
        res.on("data", (result) => console.log(result));
        res.on("error", (error) => console.error(error));
        res.once("close", resolve);
      });
    });
  } catch (error) {
    console.error("Computation failed:", error);
  } finally {
    await executor.shutdown();
  }
})();

Note that in this case, as the chain ends with .endStream(), we can read data chunks from ReadableStream, denoted as res.

Once the stream is closed, we can terminate our TaskExecutor instance.

Commands batch endstream output logs

info

Since closing the chain with .endStream() produces ReadableStream, you can also synchronously retrieve the results:

import { TaskExecutor, pinoPrettyLogger } from "@golem-sdk/task-executor";
(async () => {
  const executor = await TaskExecutor.create({
    package: "golem/node:20-alpine",
    logger: pinoPrettyLogger(),
    yagnaOptions: { apiKey: "try_golem" },
  });

  try {
    const result = await executor.run(async (ctx) => {
      const res = await ctx
        .beginBatch()
        .uploadFile("./worker.mjs", "/golem/input/worker.mjs")
        .run("node /golem/input/worker.mjs > /golem/input/output.txt")
        .run("cat /golem/input/output.txt")
        .downloadFile("/golem/input/output.txt", "./output.txt")
        .endStream();

      for await (const chunk of res) {
        if (chunk.index === 2) console.log(chunk.stdout);
      }
    });
  } catch (err) {
    console.error("Task encountered an error:", err);
  } finally {
    await executor.shutdown();
  }
})();

Running commands and collecting output as a stream

Here are two examples of how to run a command and collect its output as a stream.

Basic runAndStream scenario

In the first example, we run a command that produces both stdout and stderr outputs that we pass to the console. This command will terminate on its own after ten cycles.

import { TaskExecutor, pinoPrettyLogger } from "@golem-sdk/task-executor";

(async function main() {
  const executor = await TaskExecutor.create({
    // What do you want to run
    package: "golem/alpine:latest",
    logger: pinoPrettyLogger(),
    yagnaOptions: { apiKey: "try_golem" },
    budget: 0.5,
    // Control the execution of tasks
    taskTimeout: 5 * 60 * 1000,
  });

  try {
    let result = await executor.run(async (ctx) => {
      console.log("Provider deployed");

      await ctx.run(
        `echo 'counter=0; while [ $counter -lt 10 ]; do ls ./home non-existing-file; sleep 1; counter=$(($counter+1)); done' > script.sh`,
      );

      await ctx.run("chmod 700 ./script.sh");

      let remoteProcess = await ctx.runAndStream("/bin/sh ./script.sh");

      remoteProcess.stdout.on("data", (data) => console.log("stdout: ", data));
      remoteProcess.stderr.on("data", (data) => console.error("stderr: ", data));

      await new Promise((resolve) => {
        remoteProcess.stdout.on("close", resolve);
      });
      return 0;
    });
    console.log(result);
  } catch (err) {
    console.error("Running the task on Golem failed due to", err);
  } finally {
    await executor.shutdown();
  }
})();

runAndStream scenario with timeout defined

In this example, we show how to use remoteProcess.waitForExit() to terminate the process. Note that in the current implementation, the exit caused by timeout will terminate the activity on a provider, therefore the user cannot run another command on the provider. The task executor will instead run the next task on another provider.

import { TaskExecutor, pinoPrettyLogger } from "@golem-sdk/task-executor";

const executor = await TaskExecutor.create({
  // What do you want to run
  package: "golem/alpine:latest",
  // yagnaOptions: { apiKey: "try_golem" },
  logger: pinoPrettyLogger(),
  budget: 0.5,
  maxParallelTasks: 1,
});

// the example will run a tasks 4 times, in sequence (as maxParallelTasks is 1)
for (const i of [1, 2, 3, 4]) {
  await executor
    .run(async (ctx) => {
      // each task will spawn a script that generates a sequence of 5 pairs of messages sent to stdout and stderr separated by 1 sec delay

      // the command generating the sequence is saved to script.sh file
      await ctx.run(
        `echo 'counter=0; while [ $counter -lt 5 ]; do ls -ls ./script.sh non-existing-file; sleep 1; counter=$(($counter+1)); done' > script.sh`,
      );
      // permissions are modified to be able to run the script
      await ctx.run("chmod 700 ./script.sh");

      // script is run and stream results, stdout and stderr are processed
      let remoteProcess = await ctx.runAndStream("/bin/sh ./script.sh");

      remoteProcess.stdout.on("data", (data) => console.log(`iteration: ${i}:`, "stdout>", data));
      remoteProcess.stderr.on("data", (data) => console.error(`iteration: ${i}:`, "stderr>", data));

      // For odd tasks, we set streaming timeout to 10 secs,
      // the script will end normally, for equal tasks we will exit the run method after 3 secs.
      // The exit caused by timeout will terminate the activity on a provider,
      // therefore the user cannot run another command on the provider.
      // Task executor will run the next task on another provider.

      const timeout = i % 2 === 0 ? 3_000 : 10_000;
      const finalResult = await remoteProcess.waitForExit(timeout).catch(async (e) => {
        console.log(`Iteration: ${i} Error: ${e.message}, Provider: ${e.provider.name}`);
        ctx
          .run("ls -l")
          .catch((e) =>
            console.log("Running command after normal runAndStream exit is NOT possible, you will get an error:\n", e),
          );
      });
      if (finalResult) {
        // if the spawn exited without timeout, the provider is still available
        console.log(`Iteration: ${i} results: ${finalResult?.result}. Provider: ${ctx.provider.name}`);

        console.log("Running command after normal runAndStream exit is possible:", (await ctx.run("ls -l")).stdout);
      }
    })
    .catch((error) => console.error("Execution of task failed due to error.", error));
}

await executor.shutdown();