Golem Python API Reference

Golem

class yapapi.Golem(*, budget, strategy=None, subnet_tag=None, driver=None, payment_driver=None, network=None, payment_network=None, event_consumer=None, stream_output=False, app_key=None)

The main entrypoint of Golem's high-level API.

Its principal role is providing an interface to run the requestor’s payload using one of two modes of operation - executing tasks and running services.

The first one, available through execute_tasks(), instructs Golem to take a sequence of tasks that the user wishes to compute on Golem and distributes those among the providers.

The second one, invoked with run_service(), makes Golem spawn a certain number of instances of a service based on a single service specification (a specialized implementation inheriting from Service).

While the two modes are not necessarily completely disjoint - in that we can create a service that exists to process a certain number of computations and, similarly, we can use the task model to run some service - the main difference lies in the lifetime of such a job.

Whereas a task-based job exists for the purpose of computing the specific sequence of tasks and is done once the whole sequence has been processed, the service-based job is created for a potentially indefinite period and the services spawned within it are kept alive for as long as they’re needed.

Additionally, the service interface provides a way to easily define handlers for certain, discrete phases of a lifetime of each service instance - startup, running and shutdown.

Internally, Golem’s job includes running the engine which takes care of first finding the providers interested in the jobs the requestors want to execute, then negotiating agreements with them and facilitating the execution of those jobs and lastly, processing payments. For this reason, it’s usually good to have just one instance of Golem operative at any given time.

__init__(*, budget, strategy=None, subnet_tag=None, driver=None, payment_driver=None, network=None, payment_network=None, event_consumer=None, stream_output=False, app_key=None)

Initialize Golem engine.

Parameters
  • budget (Union[float, Decimal]) – maximum budget for payments

  • strategy (Optional[MarketStrategy]) – market strategy used to select providers from the market (e.g. yapapi.strategy.LeastExpensiveLinearPayuMS or yapapi.strategy.DummyMS)

  • subnet_tag (Optional[str]) – use only providers in the subnet with the subnet_tag name. Uses YAGNA_SUBNET environment variable, defaults to None

  • driver (Optional[str]) – deprecated, please use payment_driver instead

  • payment_driver (Optional[str]) – name of the payment driver to use. Uses YAGNA_PAYMENT_DRIVER environment variable, defaults to zksync. Only payment platforms with the specified driver will be used

  • network (Optional[str]) – deprecated, please use payment_network instead

  • payment_network (Optional[str]) – name of the network to use. Uses YAGNA_NETWORK environment variable, defaults to rinkeby. Only payment platforms with the specified network will be used

  • event_consumer (Optional[Callable[[Event], None]]) – a callable that processes events related to the computation; by default it is a function that logs all events

  • stream_output (bool) – stream computation output from providers

  • app_key (Optional[str]) – optional Yagna application key. If not provided, the default is to get the value from YAGNA_APPKEY environment variable

async create_network(ip, owner_ip=None, mask=None, gateway=None)

Create a VPN inside Golem network.

Requires yagna >= 0.8

Parameters
  • ip (str) – the IP address of the network. May contain netmask, e.g. “192.168.0.0/24”

  • owner_ip (Optional[str]) – the desired IP address of the requestor node within the newly-created Network

  • mask (Optional[str]) – Optional netmask (only if not provided within the ip argument)

  • gateway (Optional[str]) – Optional gateway address for the network

Return type

Network

async execute_tasks(worker, data, payload, max_workers=None, timeout=None, job_id=None, implicit_init=True)

Submit a sequence of tasks to be executed on providers.

Internally, this method creates an instance of yapapi.executor.Executor and calls its submit() method with given worker function and sequence of tasks.

Parameters
  • worker (Callable[[WorkContext, AsyncIterator[Task[~D, ~R]]], AsyncGenerator[Script, Awaitable[List[CommandEvent]]]]) – an async generator that takes a WorkContext object and a sequence of tasks, and generates as sequence of scripts to be executed on providers in order to compute given tasks

  • data (Union[AsyncIterator[Task[~D, ~R]], Iterable[Task[~D, ~R]]]) – an iterable or an async generator of Task objects to be computed on providers

  • payload (Payload) – specification of the payload that needs to be deployed on providers (for example, a VM runtime package) in order to compute the tasks, passed to the created Executor instance

  • max_workers (Optional[int]) – maximum number of concurrent workers, passed to the Executor instance

  • timeout (Optional[timedelta]) – timeout for computing all tasks, passed to the Executor instance

  • job_id (Optional[str]) – an optional string to identify the job created by this method. Passed as the value of the id parameter to yapapi.engine.Job.

  • implicit_init (bool) – True -> deploy() and start() will be called internally by the Executor. False -> those calls must be in the worker function

Return type

AsyncIterator[Task[~D, ~R]]

Returns

an async iterator that yields completed Task objects

example usage:

async def worker(context: WorkContext, tasks: AsyncIterable[Task]):
    async for task in tasks:
        context.run("/bin/sh", "-c", "date")

        future_results = yield context.commit()
        results = await future_results
        task.accept_result(result=results[-1])

package = await vm.repo(
    image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376",
)

async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem:
    async for completed in golem.execute_tasks(worker, [Task(data=None)], payload=package):
        print(completed.result.stdout)
async run_service(service_class, num_instances=None, instance_params=None, payload=None, expiration=None, respawn_unstarted_instances=True, network=None, network_addresses=None)

Run a number of instances of a service represented by a given Service subclass.

Parameters
  • service_class (Type[Service]) – a subclass of Service that represents the service to be run

  • num_instances (Optional[int]) – optional number of service instances to run. Defaults to a single instance, unless instance_params is given, in which case, the Cluster will be created with as many instances as there are elements in the instance_params iterable. if num_instances is set to < 1, the Cluster will still be created but no instances will be spawned within it.

  • instance_params (Optional[Iterable[Dict]]) – optional list of dictionaries of keyword arguments that will be passed to consecutive, spawned instances. The number of elements in the iterable determines the number of instances spawned, unless num_instances is given, in which case the latter takes precedence. In other words, if both num_instances and instance_params are provided, the Cluster will be created with the number of instances determined by num_instances and if there are too few elements in the instance_params iterable, it will results in an error.

  • payload (Optional[Payload]) – optional runtime definition for the service; if not provided, the payload specified by the get_payload() method of service_class is used

  • expiration (Optional[datetime]) – optional expiration datetime for the service

  • respawn_unstarted_instances – if an instance fails in the starting state, should the returned Cluster try to spawn another instance

  • network (Optional[Network]) – optional Network, representing a VPN to attach this Cluster’s instances to

  • network_addresses (Optional[List[str]]) – optional list of addresses to assign to consecutive spawned instances. If there are too few addresses given in the network_addresses iterable to satisfy all spawned instances, the rest (or all when the list is empty or not provided at all) of the addresses will be assigned automatically. Requires the network argument to be provided at the same time.

example usage:

DATE_OUTPUT_PATH = "/golem/work/date.txt"
REFRESH_INTERVAL_SEC = 5


class DateService(Service):
    @staticmethod
    async def get_payload():
        return await vm.repo(
            image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376",
        )

    async def start(self):
        async for script in super().start():
            yield script

        # every `DATE_POLL_INTERVAL` write output of `date` to `DATE_OUTPUT_PATH`
        script = self._ctx.new_script()
        script.run(
            "/bin/sh",
            "-c",
            f"while true; do date > {DATE_OUTPUT_PATH}; sleep {REFRESH_INTERVAL_SEC}; done &",
        )
        yield script

    async def run(self):
        while True:
            await asyncio.sleep(REFRESH_INTERVAL_SEC)
            script = self._ctx.new_script()
            future_result = script.run(
                "/bin/sh",
                "-c",
                f"cat {DATE_OUTPUT_PATH}",
            )

            yield script

            result = (await future_result).stdout
            print(result.strip() if result else "")


async def main():
    async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem:
        cluster = await golem.run_service(DateService, num_instances=1)
        start_time = datetime.now()

        while datetime.now() < start_time + timedelta(minutes=1):
            for num, instance in enumerate(cluster.instances):
                print(f"Instance {num} is {instance.state.value} on {instance.provider_name}")
            await asyncio.sleep(REFRESH_INTERVAL_SEC)
Return type

Cluster

Task API

Task

class yapapi.Task(data)

One computation unit.

Represents one computation unit that will be run on the provider (e.g. rendering of one frame of an animation).

__init__(data)

Create a new Task object.

Parameters

data (~TaskData) – contains information needed to prepare command list for the provider

accept_result(result=None)

Accept the result of this task.

Must be called when the result is correct to mark this task as completed.

Parameters

result (Optional[~TaskResult]) – task computation result (optional)

Return type

None

reject_result(reason=None, retry=False)

Reject the result of this task.

Must be called when the result is not correct to indicate that the task should be retried.

Parameters

reason (Optional[str]) – task rejection description (optional)

Return type

None

property running_time: Optional[datetime.timedelta]

Return the running time of the task (if in progress) or time it took to complete it.

Return type

Optional[timedelta]

Service API

Service

class yapapi.services.Service(cluster, ctx, network_node=None)

Base class for service specifications.

To be extended by application developers to define their own, specialized Service specifications.

async static get_payload()

Return the payload (runtime) definition for this service.

To be overridden by the author of a specific Service class.

If get_payload() is not implemented, the payload will need to be provided in the run_service() call.

Return type

Optional[Payload]

property id: str

Return the id of this service instance.

Guaranteed to be unique within a Cluster.

Return type

str

property is_available

Return True iff this instance is available (that is, starting, running or stopping).

property network: Optional[yapapi.network.Network]

Return the Network to which this instance belongs (if any)

Return type

Optional[Network]

property network_node: Optional[yapapi.network.Node]

Return the network Node record associated with this instance.

Return type

Optional[Node]

property provider_name: Optional[str]

Return the name of the provider that runs this service instance.

Return type

Optional[str]

async receive_message()

Wait for a control message sent to this instance.

Return type

ServiceSignal

receive_message_nowait()

Retrieve a control message sent to this instance.

Return None if no message is available.

Return type

Optional[ServiceSignal]

async run()

Implement the handler for the running state of the service.

To be overridden by the author of a specific Service class.

Should contain any operations needed to ensure continuous operation of a service.

As a handler implementing the work generator pattern, it’s expected to be a generator that yields Script (generated using the service’s instance of the WorkContext - self._ctx) that are then dispatched to the activity by the engine.

Results of those batches can then be retrieved by awaiting the values captured from yield statements.

A clean exit from a handler function triggers the engine to transition the state of the instance to the next stage in service’s lifecycle - in this case, to stopping.

Any unhandled exception will cause the instance to be terminated.

Example:

async def run(self):
    while True:
        script = self._ctx.new_script()
        stats_results = script.run(self.SIMPLE_SERVICE, "--stats")
        yield script
        stats = (await stats_results).stdout.strip()
        print(f"stats: {stats}")

Default implementation

Because the nature of the operations required during the “running” state depends directly on the specifics of a given Service and because it’s entirely plausible for a service not to require any direct interaction with the exe-unit (runtime) from the requestor’s end after the service has been started, the default is to just wait indefinitely without producing any batches.

Return type

AsyncGenerator[Script, Awaitable[List[CommandEvent]]]

async send_message(message=None)

Send a control message to this instance.

send_message_nowait(message=None)

Send a control message to this instance without blocking.

May raise asyncio.QueueFull if the channel for sending control messages is full.

async shutdown()

Implement the handler for the stopping state of the service.

To be overridden by the author of a specific Service class.

Should contain any operations that the requestor needs to ensure the instance is correctly and gracefully shut-down - e.g. that its final state is retrieved.

As a handler implementing the work generator pattern, it’s expected to be a generator that yields Script (generated using the service’s instance of the WorkContext - self._ctx) that are then dispatched to the activity by the engine.

Results of those batches can then be retrieved by awaiting the values captured from yield statements.

Finishing the execution of this handler will trigger termination of this instance.

This handler will only be called if the activity running the service is still available. If the activity has already been deemed terminated or if the connection with the provider has been lost, the service will transition to the terminated state and the shutdown handler won’t be run.

Example:

async def shutdown(self):
    self._ctx.run("/golem/run/dump_state")
    self._ctx.download_file("/golem/output/state", "/some/local/path/state")
    self._ctx.terminate()
    yield self._ctx.commit()

Default implementation

By default, the activity is just sent a terminate command. Whether it’s absolutely required or not, again, depends on the implementation of the given runtime.

Return type

AsyncGenerator[Script, Awaitable[List[CommandEvent]]]

async start()

Implement the handler for the starting state of the service.

To be overridden by the author of a specific Service class.

Should perform the minimum set of operations after which the instance of a service can be treated as “started”, or, in other words, ready to receive service requests. It’s up to the developer of the specific Service class to decide what exact operations constitute a service startup. In the most common scenario deploy() and start() are required, check the Default implementation section for more details.

As a handler implementing the work generator pattern, it’s expected to be a generator that yields Script (generated using the service’s instance of the WorkContext - self._ctx) that are then dispatched to the activity by the engine.

Results of those batches can then be retrieved by awaiting the values captured from yield statements.

A clean exit from a handler function triggers the engine to transition the state of the instance to the next stage in service’s lifecycle - in this case, to running.

On the other hand, any unhandled exception will cause the instance to be either retried on another provider node, if the Cluster’s respawn_unstarted_instances argument is set to True in run_service(), which is also the default behavior, or altogether terminated, if respawn_unstarted_instances is set to False.

Example:

async def start(self):
    s = self._ctx.new_script()
    # deploy the exe-unit
    s.deploy(**self.get_deploy_args())
    # start the exe-unit's container
    s.start()
    # start some service process within the container
    s.run("/golem/run/service_ctl", "--start")
    # send the batch to the provider
    yield s

### Default implementation

The default implementation assumes that, in order to accept commands, the runtime needs to be first deployed using the deploy() command, which is analogous to creation of a container corresponding with the desired payload, and then started using the start() command, actually launching the process that runs the aforementioned container.

Additionally, it also assumes that the exe-unit doesn’t need any additional parameters in its start() call (e.g. for the VM runtime, all the required parameters are already passed as part of the agreement between the requestor and the provider), and parameters passed to deploy() are returned by Service.get_deploy_args() method.

Therefore, this default implementation performs the minimum required for a VM payload to start responding to run commands. If your service requires any additional operations - you’ll need to override this method (possibly first yielding from the parent - super().start() - generator) to add appropriate preparatory steps.

In case of runtimes other than VM, deploy and/or start might be optional or altogether disallowed, or they may take some parameters. It is up to the author of the specific Service implementation that uses such a payload to adjust this method accordingly based on the requirements for the given runtime/exe-unit type.

Return type

AsyncGenerator[Script, Awaitable[List[CommandEvent]]]

property state

Return the current state of this instance.

Cluster

class yapapi.services.Cluster(engine, service_class, payload, expiration=None, respawn_unstarted_instances=True, network=None)

Golem’s sub-engine used to spawn and control instances of a single Service.

emit(event)

Emit an event using this Cluster’s engine.

Return type

None

property expiration: datetime.datetime

Return the expiration datetime for agreements related to services in this Cluster.

Return type

datetime

get_state(service)

Return the state of the specific instance in this Cluster.

Return type

ServiceState

property instances: List[yapapi.services.Service]

Return the list of service instances in this Cluster.

Return type

List[Service]

property network: Optional[yapapi.network.Network]

Return the Network record associated with the VPN used by this Cluster.

Return type

Optional[Network]

property payload: yapapi.payload.Payload

Return the service runtime definition for this Cluster.

Return type

Payload

property service_class: Type[yapapi.services.Service]

Return the class instantiated by all service instances in this Cluster.

Return type

Type[Service]

async spawn_instance(params, network_address=None)

Spawn a new service instance within this Cluster.

Return type

None

spawn_instances(num_instances=None, instance_params=None, network_addresses=None)

Spawn new instances within this Cluster.

Parameters
  • num_instances (Optional[int]) – optional number of service instances to run. Defaults to a single instance, unless instance_params is given, in which case, the Cluster will spawn as many instances as there are elements in the instance_params iterable. if num_instances is not None and < 1, the method will immediately return and log a warning.

  • instance_params (Optional[Iterable[Dict]]) – optional list of dictionaries of keyword arguments that will be passed to consecutive, spawned instances. The number of elements in the iterable determines the number of instances spawned, unless num_instances is given, in which case the latter takes precedence. In other words, if both num_instances and instance_params are provided, the number of instances spawned will be equal to num_instances and if there are too few elements in the instance_params iterable, it will results in an error.

  • network_addresses (Optional[List[str]]) – optional list of network addresses in case the Cluster is attached to VPN. If the list is not provided (or if the number of elements is less than the number of spawned instances), any instances for which the addresses have not been given, will be assigned an address automatically.

Return type

None

stop()

Signal the whole Cluster to stop.

stop_instance(service)

Stop the specific Service instance belonging to this Cluster.

ServiceState

class yapapi.services.ServiceState(model=None, state_field='state', start_value=None)

State machine describing the state and lifecycle of a Service instance.

Network API

Network

class yapapi.network.Network(net_api, ip, owner_id, owner_ip=None, mask=None, gateway=None)

Describes a VPN created between the requestor and the provider nodes within Golem Network.

async add_node(node_id, ip=None)

Add a new node to the network.

Parameters
  • node_id (str) – Node ID within the Golem network of this VPN node.

  • ip (Optional[str]) – IP address to assign to this node.

Return type

Node

async add_owner_address(ip)

Assign the given IP address to the requestor in the network.

Parameters

ip (str) – the IP address to assign to the requestor node.

async classmethod create(net_api, ip, owner_id, owner_ip=None, mask=None, gateway=None)

Create a new VPN.

Parameters
  • net_api (Net) – the mid-level binding used directly to perform calls to the REST API.

  • ip (str) – the IP address of the network. May contain netmask, e.g. “192.168.0.0/24”

  • owner_id (str) – the node ID of the owner of this VPN (the requestor)

  • owner_ip (Optional[str]) – the desired IP address of the requestor node within the newly-created network

  • mask (Optional[str]) – Optional netmask (only if not provided within the ip argument)

  • gateway (Optional[str]) – Optional gateway address for the network

Return type

Network

property gateway: Optional[str]

The gateway address within this network, if provided.

Return type

Optional[str]

property netmask: str

The netmask of this network.

Return type

str

property network_address: str

The network address of this network, without a netmask.

Return type

str

property network_id: str

The automatically-generated, unique ID of this VPN.

Return type

str

property nodes_dict: Dict[str, str]

Mapping between the IP addresses and Node IDs of the nodes within this network.

Return type

Dict[str, str]

property owner_ip: str

The IP address of the requestor node within the network.

Return type

str

Node

class yapapi.network.Node(network, node_id, ip)

Describes a node in a VPN, mapping a Golem node id to an IP address.

get_deploy_args()

Generate a dictionary of arguments that are required for the appropriate Deploy command of an exescript in order to pass the network configuration to the runtime on the provider’s end.

Return type

Dict

ip: str

IP address of this node in this particular VPN.

network: yapapi.network.Network

The Network (the specific VPN) this node is part of.

node_id: str

Golem id of the node.

Exceptions

class yapapi.network.NetworkError

Exception raised by Network when an operation is not possible

Payload definition

Payload

class yapapi.payload.Payload(**kwargs)

Base class for descriptions of the payload required by the requestor.

example usage:

import asyncio

from dataclasses import dataclass
from yapapi.props.builder import DemandBuilder
from yapapi.props.base import prop, constraint
from yapapi.props import inf

from yapapi.payload import Payload

CUSTOM_RUNTIME_NAME = "my-runtime"
CUSTOM_PROPERTY = "golem.srv.app.myprop"


@dataclass
class MyPayload(Payload):
    myprop: str = prop(CUSTOM_PROPERTY, default="myvalue")
    runtime: str = constraint(inf.INF_RUNTIME_NAME, default=CUSTOM_RUNTIME_NAME)
    min_mem_gib: float = constraint(inf.INF_MEM, operator=">=", default=16)
    min_storage_gib: float = constraint(inf.INF_STORAGE, operator=">=", default=1024)


async def main():
    builder = DemandBuilder()
    payload = MyPayload(myprop="othervalue", min_mem_gib=32)
    await builder.decorate(payload)
    print(builder)

asyncio.run(main())

output:

{'properties': {'golem.srv.app.myprop': 'othervalue'}, 'constraints': ['(&(golem.runtime.name=my-runtime)
(golem.inf.mem.gib>=32)
(golem.inf.storage.gib>=1024))']}

Package

class yapapi.payload.package.Package

Description of a task package (e.g. a VM image) deployed on the provider nodes

vm.repo

async yapapi.payload.vm.repo(*, image_hash, image_url=None, min_mem_gib=0.5, min_storage_gib=2.0, min_cpu_threads=1, capabilities=None)

Build a reference to application package.

Parameters
  • image_hash (str) – hash of the package’s image

  • image_url (Optional[str]) – URL of the package’s image

  • min_mem_gib (float) – minimal memory required to execute application code

  • min_storage_gib (float) – minimal disk storage to execute tasks

  • min_cpu_threads (int) – minimal available logical CPU cores

  • capabilities (Optional[List[Literal[‘vpn’]]]) – an optional list of required vm capabilities

Return type

Package

Returns

the payload definition for the given VM image

example usage:

package = await vm.repo(
    # if we provide only the image hash, the image will be
    # automatically pulled from Golem's image repository
    image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376",
)

example usage with an explicit GVMI image URL (useful to host images outside the Golem repository):

package = await vm.repo(
    # we still need to provide the image's hash because
    # the image's integrity is validated by the runtime on the provider node
    #
    # the hash can be calculated by running `sha3sum -a 224 <image_filename.gvmi>`
    #
    image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376",

    # the URL can point to any publicly-available location on the web
    image_url="http://girepo.dev.golem.network:8000/docker-golem-hello-world-latest-779758b432.gvmi",
)

example usage with additional constraints:

package = await vm.repo(
    image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae",
    # only run on provider nodes that have more than 0.5gb of RAM available
    min_mem_gib=0.5,
    # only run on provider nodes that have more than 2gb of storage space available
    min_storage_gib=2.0,
    # only run on provider nodes which a certain number of CPU threads available
    min_cpu_threads=min_cpu_threads,
)

Execution control

WorkContext

class yapapi.WorkContext(activity, agreement_details, storage, emitter=None)

Provider node’s work context.

Used to schedule commands to be sent to the provider and enable other interactions between the requestor agent’s client code and the activity on provider’s end.

async get_cost()

Get the accumulated cost of the activity based on the reported usage.

Return type

Optional[float]

async get_raw_state()

Get the state activity bound to this work context.

The value comes directly from the low level API and is not interpreted in any way.

Return type

ActivityState

async get_raw_usage()

Get the raw usage vector for the activity bound to this work context.

The value comes directly from the low level API and is not interpreted in any way.

Return type

ActivityUsage

async get_usage()

Get the current usage for the activity bound to this work context.

Return type

ActivityUsage

property id: str

Unique identifier for this work context.

Return type

str

new_script(timeout=None, wait_for_results=True)

Create an instance of Script attached to this WorkContext instance.

This is equivalent to calling Script(work_context). This method is intended to provide a direct link between the two object instances.

Return type

Script

property provider_id: str

Return the id of the provider associated with this work context.

Return type

str

property provider_name: Optional[str]

Return the name of the provider associated with this work context.

Return type

Optional[str]

Script

class yapapi.script.Script(context, timeout=None, wait_for_results=True)

Represents a series of commands to be executed on a provider node.

New commands are added to the script either through its add() method or by calling one of the convenience methods provided (for example: run() or upload_json()). Adding a new command does not result in it being immediately executed. Once ready, a Script instance is meant to be yielded from a worker function (work generator pattern). Commands will be run in the order in which they were added to the script.

__init__(context, timeout=None, wait_for_results=True)

Initialize a Script

Parameters
  • context (WorkContext) – A yapapi.WorkContext that will be used to evaluate the script (i.e. to send commands to the provider)

  • timeout (Optional[timedelta]) – Time after which this script’s execution should be forcefully interrupted. The default value is None which means there’s no timeout set.

  • wait_for_results (bool) – Whether this script’s execution should block until its results are available. The default value is True.

add(cmd)

Add a yapapi.script.command.Command to the Script

Return type

Awaitable[CommandExecuted]

deploy(**kwargs)

Schedule a Deploy command on the provider.

Return type

Awaitable[CommandExecuted]

download_bytes(src_path, on_download, limit=1048576)

Schedule downloading a remote file from the provider as bytes.

Parameters
  • src_path (str) – remote (provider) source path

  • on_download (Callable[[bytes], Awaitable]) – the callable to run on the received data

  • limit (int) – limit of bytes to be downloaded (expected size)

Return type

Awaitable[CommandExecuted]

download_file(src_path, dst_path)

Schedule downloading a remote file from the provider.

Parameters
  • src_path (str) – remote (provider) source path

  • dst_path (str) – local (requestor) destination path

Return type

Awaitable[CommandExecuted]

download_json(src_path, on_download, limit=1048576)

Schedule downloading a remote file from the provider as JSON.

Parameters
  • src_path (str) – remote (provider) source path

  • on_download (Callable[[Any], Awaitable]) – the callable to run on the received data

  • limit (int) – limit of bytes to be downloaded (expected size)

Return type

Awaitable[CommandExecuted]

property id: int

Return the ID of this Script instance.

IDs are provided by a global iterator and therefore are guaranteed to be unique during the program’s execution.

Return type

int

run(cmd, *args, env=None, stderr=None, stdout=None)

Schedule running a shell command on the provider.

Parameters
  • cmd (str) – command to run on the provider, e.g. /my/dir/run.sh

  • args (str) – command arguments, e.g. “input1.txt”, “output1.txt”

  • env (Optional[Dict[str, str]]) – optional dictionary with environment variables

  • stderr (Optional[CaptureContext]) – capture context to use for stderr

  • stdout (Optional[CaptureContext]) – capture context to use for stdout

Return type

Awaitable[CommandExecuted]

start(*args)

Schedule a Start command on the provider.

Return type

Awaitable[CommandExecuted]

terminate()

Schedule a Terminate command on the provider.

Return type

Awaitable[CommandExecuted]

upload_bytes(data, dst_path)

Schedule sending bytes data to the provider.

Parameters
  • data (bytes) – bytes to send

  • dst_path (str) – remote (provider) destination path

Return type

Awaitable[CommandExecuted]

upload_file(src_path, dst_path)

Schedule sending a file to the provider.

Parameters
  • src_path (str) – local (requestor) source path

  • dst_path (str) – remote (provider) destination path

Return type

Awaitable[CommandExecuted]

upload_json(data, dst_path)

Schedule sending JSON data to the provider.

Parameters
  • data (dict) – dictionary representing JSON data to send

  • dst_path (str) – remote (provider) destination path

Return type

Awaitable[CommandExecuted]

Market strategies

class yapapi.strategy.MarketStrategy

Abstract market strategy.

async decorate_demand(demand)

Optionally add relevant constraints to a Demand.

Return type

None

async score_offer(offer, history=None)

Score offer. Better offers should get higher scores.

Return type

float

class yapapi.strategy.DummyMS(max_fixed_price=Decimal('0.05'), max_price_for=mappingproxy({}), activity=None)

A default market strategy implementation.

Its score_offer() method returns SCORE_NEUTRAL for every offer with prices that do not exceed maximum prices specified for each counter. For other offers, returns SCORE_REJECTED.

class yapapi.strategy.LeastExpensiveLinearPayuMS(expected_time_secs=60, max_fixed_price=Decimal('Infinity'), max_price_for=mappingproxy({}))

A strategy that scores offers according to cost for given computation time.

class yapapi.strategy.DecreaseScoreForUnconfirmedAgreement(base_strategy, factor)

A market strategy that modifies a base strategy based on history of agreements.

Exceptions

exception yapapi.NoPaymentAccountError(required_driver, required_network)

The error raised if no payment account for the required driver/network is available.

exception yapapi.rest.activity.BatchTimeoutError

An exception that indicates that an execution of a batch of commands timed out.

Logging

class yapapi.log.SummaryLogger(wrapped_emitter=None)

Aggregates information from computation events to provide a high-level summary.

The logger’s log() method can be used as event_consumer callback in Golem initialization. It will aggregate the events generated and output some summary information.

The optional wrapped_emitter argument can be used for chaining event emitters: each event logged with log() is first passed to wrapped_emitter.

For example, with the following setup, each event will be logged by log_event_repr, and additionally, certain events will cause summary messages to be logged.

detailed_logger = log_event_repr
summary_logger = SummaryLogger(wrapped_emitter=detailed_logger).log
golem = Golem(..., event_consumer=summary_logger)
log(event)

Register an event.

Return type

None

yapapi.log.enable_default_logger(format_='[%(asctime)s %(levelname)s %(name)s] %(message)s', log_file=None, debug_activity_api=False, debug_market_api=False, debug_payment_api=False, debug_net_api=False)

Enable the default logger that logs messages to stderr with level INFO.

If log_file is specified, the logger with output messages with level DEBUG to the given file.

yapapi.log.log_summary(wrapped_emitter=None)

Output a summary of computation.

This is a utility function that creates a SummaryLogger instance wrapping an optional wrapped_emitter and returns its log() method.

See the documentation of SummaryLogger for more information.

Utils

yapapi.windows_event_loop_fix()

Set up asyncio to use ProactorEventLoop implementation for new event loops on Windows.

This work-around is only needed for Python 3.6 and 3.7. With Python 3.8, ProactorEventLoop is already the default on Windows.

yapapi.get_version()
Return type

str

Returns

the version of the yapapi library package