Golem Python API Reference

Golem

class yapapi.Golem(*, budget, strategy=None, subnet_tag=None, payment_driver=None, payment_network=None, event_consumer=None, stream_output=False, app_key=None)

The main entrypoint of Golem's high-level API.

Its principal role is providing an interface to run the requestor’s payload using one of two modes of operation - executing tasks and running services.

The first one, available through execute_tasks(), instructs Golem to take a sequence of tasks that the user wishes to compute on Golem and distributes those among the providers.

The second one, invoked with run_service(), makes Golem spawn a certain number of instances of a service based on a single service specification (a specialized implementation inheriting from Service).

While the two modes are not necessarily completely disjoint - in that we can create a service that exists to process a certain number of computations and, similarly, we can use the task model to run some service - the main difference lies in the lifetime of such a job.

Whereas a task-based job exists for the purpose of computing the specific sequence of tasks and is done once the whole sequence has been processed, the service-based job is created for a potentially indefinite period and the services spawned within it are kept alive for as long as they’re needed.

Additionally, the service interface provides a way to easily define handlers for certain, discrete phases of a lifetime of each service instance - startup, running and shutdown.

Internally, Golem’s job includes running the engine which takes care of first finding the providers interested in the jobs the requestors want to execute, then negotiating agreements with them and facilitating the execution of those jobs and lastly, processing payments. For this reason, it’s usually good to have just one instance of Golem operative at any given time.

__init__(*, budget, strategy=None, subnet_tag=None, payment_driver=None, payment_network=None, event_consumer=None, stream_output=False, app_key=None)

Initialize Golem engine.

Parameters
  • budget (Union[float, Decimal]) – maximum budget for payments

  • strategy (Optional[BaseMarketStrategy]) – market strategy used to select providers from the market (e.g. yapapi.strategy.LeastExpensiveLinearPayuMS or yapapi.strategy.DummyMS)

  • subnet_tag (Optional[str]) – use only providers in the subnet with the subnet_tag name. Uses YAGNA_SUBNET environment variable, defaults to None

  • payment_driver (Optional[str]) – name of the payment driver to use. Uses YAGNA_PAYMENT_DRIVER environment variable, defaults to erc20. Only payment platforms with the specified driver will be used

  • payment_network (Optional[str]) – name of the network to use. Uses YAGNA_NETWORK environment variable, defaults to rinkeby. Only payment platforms with the specified network will be used

  • event_consumer (Optional[Callable[[Event], None]]) – a callable that processes events related to the computation; by default it is a function that logs all events

  • stream_output (bool) – stream computation output from providers

  • app_key (Optional[str]) – optional Yagna application key. If not provided, the default is to get the value from YAGNA_APPKEY environment variable

add_event_consumer(event_consumer, event_classes_or_names=(<class 'yapapi.events.Event'>, ))

Initialize another event_consumer, working just like the event_consumer passed to Golem.__init__()

Parameters
  • event_consumer (Callable[[Event], None]) – A callable that will be executed on every event.

  • event_classes_or_names (Iterable[Union[Type[Event], str]]) – An iterable defining classes of events that should be passed to this event_consumer. Both classes and class names are accepted (in the latter case classes must be available in the yapapi.events namespace). If this argument is omitted, all events inheriting from yapapi.events.Event (i.e. all currently implemented events) will be passed to the event_consumer.

Example usages:

def event_consumer(event: "yapapi.events.Event"):
    print(f"Got an event! {type(event).__name__}")

golem.add_event_consumer(event_consumer)
def event_consumer(event: "yapapi.events.AgreementConfirmed"):
    provider_name = event.agreement.details.provider_node_info.name
    print(f"We're trading with {provider_name}! Nice!")

golem.add_event_consumer(event_consumer, ["AgreementConfirmed"])
async start()

Start the Golem engine in non-contextmanager mode.

The default way of using Golem:

async with Golem(...) as golem:
    # ... work with golem

Is roughly equivalent to:

golem = Golem(...)
try:
    await golem.start()
    # ... work with golem
finally:
    await golem.stop()
A repeated call to Golem.start():
  • If Golem is already starting, or started and wasn’t stopped - will be ignored (and harmless)

  • If Golem was stopped - will initialize a new engine that knows nothing about the previous operations

Return type

None

async stop()

Stop the Golem engine after it was started in non-contextmanager mode.

Details: Golem.start()

Return type

None

async execute_tasks(worker, data, payload, max_workers=None, timeout=None, job_id=None, implicit_init=True)

Submit a sequence of tasks to be executed on providers.

Internally, this method creates an instance of yapapi.executor.Executor and calls its submit() method with given worker function and sequence of tasks.

Parameters
  • worker (Callable[[WorkContext, AsyncIterator[Task[TypeVar(D), TypeVar(R)]]], AsyncGenerator[Script, Awaitable[List[CommandEvent]]]]) – an async generator that takes a WorkContext object and a sequence of tasks, and generates as sequence of scripts to be executed on providers in order to compute given tasks

  • data (Union[AsyncIterator[Task[TypeVar(D), TypeVar(R)]], Iterable[Task[TypeVar(D), TypeVar(R)]]]) – an iterable or an async generator of Task objects to be computed on providers

  • payload (Payload) – specification of the payload that needs to be deployed on providers (for example, a VM runtime package) in order to compute the tasks, passed to the created Executor instance

  • max_workers (Optional[int]) – maximum number of concurrent workers, passed to the Executor instance

  • timeout (Optional[timedelta]) – timeout for computing all tasks, passed to the Executor instance

  • job_id (Optional[str]) – an optional string to identify the job created by this method. Passed as the value of the id parameter to yapapi.engine.Job.

  • implicit_init (bool) – True -> deploy() and start() will be called internally by the Executor. False -> those calls must be in the worker function

Return type

AsyncIterator[Task[TypeVar(D), TypeVar(R)]]

Returns

an async iterator that yields completed Task objects

example usage:

async def worker(context: WorkContext, tasks: AsyncIterable[Task]):
    async for task in tasks:
        script = context.new_script()
        future_result = script.run("/bin/sh", "-c", "date")
        yield script
        task.accept_result(result=await future_result)

package = await vm.repo(
    image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376",
)

async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem:
    async for completed in golem.execute_tasks(worker, [Task(data=None)], payload=package):
        print(completed.result.stdout)
async run_service(service_class, num_instances=None, instance_params=None, payload=None, expiration=None, respawn_unstarted_instances=True, network=None, network_addresses=None)

Run a number of instances of a service represented by a given Service subclass.

Parameters
  • service_class (Type[TypeVar(ServiceType, bound= Service)]) – a subclass of Service that represents the service to be run

  • num_instances (Optional[int]) – optional number of service instances to run. Defaults to a single instance, unless instance_params is given, in which case, the Cluster will be created with as many instances as there are elements in the instance_params iterable. if num_instances is set to < 1, the Cluster will still be created but no instances will be spawned within it.

  • instance_params (Optional[Iterable[Dict]]) – optional list of dictionaries of keyword arguments that will be passed to consecutive, spawned instances. The number of elements in the iterable determines the number of instances spawned, unless num_instances is given, in which case the latter takes precedence. In other words, if both num_instances and instance_params are provided, the Cluster will be created with the number of instances determined by num_instances and if there are too few elements in the instance_params iterable, it will results in an error.

  • payload (Optional[Payload]) – optional runtime definition for the service; if not provided, the payload specified by the get_payload() method of service_class is used

  • expiration (Optional[datetime]) – optional expiration datetime for the service

  • respawn_unstarted_instances – if an instance fails in the starting state, should the returned Cluster try to spawn another instance

  • network (Optional[Network]) – optional Network, representing a VPN to attach this Cluster’s instances to

  • network_addresses (Optional[List[str]]) – optional list of addresses to assign to consecutive spawned instances. If there are too few addresses given in the network_addresses iterable to satisfy all spawned instances, the rest (or all when the list is empty or not provided at all) of the addresses will be assigned automatically. Requires the network argument to be provided at the same time.

example usage:

DATE_OUTPUT_PATH = "/golem/work/date.txt"
REFRESH_INTERVAL_SEC = 5


class DateService(Service):
    @staticmethod
    async def get_payload():
        return await vm.repo(
            image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376",
        )

    async def start(self):
        async for script in super().start():
            yield script

        # every `DATE_POLL_INTERVAL` write output of `date` to `DATE_OUTPUT_PATH`
        script = self._ctx.new_script()
        script.run(
            "/bin/sh",
            "-c",
            f"while true; do date > {DATE_OUTPUT_PATH}; sleep {REFRESH_INTERVAL_SEC}; done &",
        )
        yield script

    async def run(self):
        while True:
            await asyncio.sleep(REFRESH_INTERVAL_SEC)
            script = self._ctx.new_script()
            future_result = script.run(
                "/bin/sh",
                "-c",
                f"cat {DATE_OUTPUT_PATH}",
            )

            yield script

            result = (await future_result).stdout
            print(result.strip() if result else "")


async def main():
    async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem:
        cluster = await golem.run_service(DateService, num_instances=1)
        start_time = datetime.now()

        while datetime.now() < start_time + timedelta(minutes=1):
            for num, instance in enumerate(cluster.instances):
                print(f"Instance {num} is {instance.state.value} on {instance.provider_name}")
            await asyncio.sleep(REFRESH_INTERVAL_SEC)
Return type

Cluster[TypeVar(ServiceType, bound= Service)]

async create_network(ip, owner_ip=None, mask=None, gateway=None)

Create a VPN inside Golem network.

Requires yagna >= 0.8

Parameters
  • ip (str) – the IP address of the network. May contain netmask, e.g. “192.168.0.0/24”

  • owner_ip (Optional[str]) – the desired IP address of the requestor node within the newly-created Network

  • mask (Optional[str]) – Optional netmask (only if not provided within the ip argument)

  • gateway (Optional[str]) – Optional gateway address for the network

Return type

Network

Task API

Task

class yapapi.Task(data)

One computation unit.

Represents one computation unit that will be run on the provider (e.g. rendering of one frame of an animation).

__init__(data)

Create a new Task object.

Parameters

data (TypeVar(TaskData)) – contains information needed to prepare command list for the provider

property running_time: Optional[datetime.timedelta]

Return the running time of the task (if in progress) or time it took to complete it.

Return type

Optional[timedelta]

accept_result(result=None)

Accept the result of this task.

Must be called when the result is correct to mark this task as completed.

Parameters

result (Optional[TypeVar(TaskResult)]) – task computation result (optional)

Return type

None

reject_result(reason=None, retry=False)

Reject the result of this task.

Must be called when the result is not correct to indicate that the task should be retried.

Parameters

reason (Optional[str]) – task rejection description (optional)

Return type

None

Service API

Service

class yapapi.services.Service

Base class for service specifications.

To be extended by application developers to define their own, specialized Service specifications.

property id: str

Return the unique id of this service instance.

Return type

str

property provider_name: Optional[str]

Return the name of the provider that runs this service instance.

Return type

Optional[str]

property network: Optional[yapapi.network.Network]

Return the Network to which this instance belongs (if any)

Return type

Optional[Network]

property network_node: Optional[yapapi.network.Node]

Return the network Node record associated with this instance.

Return type

Optional[Node]

async send_message(message=None)

Send a control message to this instance.

send_message_nowait(message=None)

Send a control message to this instance without blocking.

May raise asyncio.QueueFull if the channel for sending control messages is full.

async receive_message()

Wait for a control message sent to this instance.

Return type

ServiceSignal

receive_message_nowait()

Retrieve a control message sent to this instance.

Return None if no message is available.

Return type

Optional[ServiceSignal]

async static get_payload()

Return the payload (runtime) definition for this service.

To be overridden by the author of a specific Service class.

If get_payload() is not implemented, the payload will need to be provided in the run_service() call.

Return type

Optional[Payload]

async start()

Implement the handler for the starting state of the service.

To be overridden by the author of a specific Service class.

Should perform the minimum set of operations after which the instance of a service can be treated as “started”, or, in other words, ready to receive service requests. It’s up to the developer of the specific Service class to decide what exact operations constitute a service startup. In the most common scenario deploy() and start() are required, check the Default implementation section for more details.

As a handler implementing the work generator pattern, it’s expected to be a generator that yields Script (generated using the service’s instance of the WorkContext - self._ctx) that are then dispatched to the activity by the engine.

Results of those batches can then be retrieved by awaiting the values captured from yield statements.

A clean exit from a handler function triggers the engine to transition the state of the instance to the next stage in service’s lifecycle - in this case, to running.

On the other hand, any unhandled exception will cause the instance to be either retried on another provider node, if the Cluster’s respawn_unstarted_instances argument is set to True in run_service(), which is also the default behavior, or altogether terminated, if respawn_unstarted_instances is set to False.

Example:

async def start(self):
    s = self._ctx.new_script()
    # deploy the exe-unit
    s.deploy(**self.get_deploy_args())
    # start the exe-unit's container
    s.start()
    # start some service process within the container
    s.run("/golem/run/service_ctl", "--start")
    # send the batch to the provider
    yield s

### Default implementation

The default implementation assumes that, in order to accept commands, the runtime needs to be first deployed using the deploy() command, which is analogous to creation of a container corresponding with the desired payload, and then started using the start() command, actually launching the process that runs the aforementioned container.

Additionally, it also assumes that the exe-unit doesn’t need any additional parameters in its start() call (e.g. for the VM runtime, all the required parameters are already passed as part of the agreement between the requestor and the provider), and parameters passed to deploy() are returned by Service.get_deploy_args() method.

Therefore, this default implementation performs the minimum required for a VM payload to start responding to run commands. If your service requires any additional operations - you’ll need to override this method (possibly first yielding from the parent - super().start() - generator) to add appropriate preparatory steps.

In case of runtimes other than VM, deploy and/or start might be optional or altogether disallowed, or they may take some parameters. It is up to the author of the specific Service implementation that uses such a payload to adjust this method accordingly based on the requirements for the given runtime/exe-unit type.

Return type

AsyncGenerator[Script, Awaitable[List[CommandEvent]]]

async run()

Implement the handler for the running state of the service.

To be overridden by the author of a specific Service class.

Should contain any operations needed to ensure continuous operation of a service.

As a handler implementing the work generator pattern, it’s expected to be a generator that yields Script (generated using the service’s instance of the WorkContext - self._ctx) that are then dispatched to the activity by the engine.

Results of those batches can then be retrieved by awaiting the values captured from yield statements.

A clean exit from a handler function triggers the engine to transition the state of the instance to the next stage in service’s lifecycle - in this case, to stopping.

Any unhandled exception will cause the instance to be terminated.

Example:

async def run(self):
    while True:
        script = self._ctx.new_script()
        stats_results = script.run(self.SIMPLE_SERVICE, "--stats")
        yield script
        stats = (await stats_results).stdout.strip()
        print(f"stats: {stats}")

Default implementation

Because the nature of the operations required during the “running” state depends directly on the specifics of a given Service and because it’s entirely plausible for a service not to require any direct interaction with the exe-unit (runtime) from the requestor’s end after the service has been started, the default is to just wait indefinitely without producing any batches.

Return type

AsyncGenerator[Script, Awaitable[List[CommandEvent]]]

async shutdown()

Implement the handler for the stopping state of the service.

To be overridden by the author of a specific Service class.

Should contain any operations that the requestor needs to ensure the instance is correctly and gracefully shut-down - e.g. that its final state is retrieved.

As a handler implementing the work generator pattern, it’s expected to be a generator that yields Script (generated using the service’s instance of the WorkContext - self._ctx) that are then dispatched to the activity by the engine.

Results of those batches can then be retrieved by awaiting the values captured from yield statements.

Finishing the execution of this handler will trigger termination of this instance.

This handler will only be called if the activity running the service is still available. If the activity has already been deemed terminated or if the connection with the provider has been lost, the service will transition to the terminated state and the shutdown handler won’t be run.

Example:

async def shutdown(self):
    script = self._ctx.new_script()
    script.run("/golem/run/dump_state")
    script.download_file("/golem/output/state", "/some/local/path/state")
    script.terminate()
    yield script

Default implementation

By default, the activity is just sent a terminate command. Whether it’s absolutely required or not, again, depends on the implementation of the given runtime.

Return type

AsyncGenerator[Script, Awaitable[List[CommandEvent]]]

async reset()

Reset the service to the initial state.

This method is called internally when the service is restarted in yapapi.services.ServiceRunner, so it is not necessary for services that are never restarted (note that run_service() by default restarts services that didn’t start properly).

Handlers of a restarted service are called more then once - all of the cleanup necessary between calls should be implemented here. E.g. if we initialize a counter in Service.__init__() and increment it in Service.start(), we might want to reset it here to the initial value.

Target implementation (0.10.0 and up) will raise NotImplementedError. Current implementation only warns about this future change.

Return type

None

property is_available

Return True iff this instance is available (that is, starting, running or stopping).

property state

Return the current state of this instance.

Cluster

class yapapi.services.Cluster(engine, service_class, payload, expiration=None, respawn_unstarted_instances=True, network=None)

Golem’s sub-engine used to spawn and control instances of a single Service.

async terminate()

Signal the whole Cluster and the underlying ServiceRunner to stop.

stop()

Stop all services in this Cluster

property expiration: datetime.datetime

Return the expiration datetime for agreements related to services in this Cluster.

Return type

datetime

property payload: yapapi.payload.Payload

Return the service runtime definition for this Cluster.

Return type

Payload

property service_class: Type[yapapi.services.service.ServiceType]

Return the class instantiated by all service instances in this Cluster.

Return type

Type[TypeVar(ServiceType, bound= Service)]

property network: Optional[yapapi.network.Network]

Return the Network record associated with the VPN used by this Cluster.

Return type

Optional[Network]

property instances: List[yapapi.services.service.ServiceType]

Return the list of service instances in this Cluster.

Return type

List[TypeVar(ServiceType, bound= Service)]

stop_instance(service)

Stop the specific Service instance belonging to this Cluster.

spawn_instances(num_instances=None, instance_params=None, network_addresses=None)

Spawn new instances within this Cluster.

Parameters
  • num_instances (Optional[int]) – optional number of service instances to run. Defaults to a single instance, unless instance_params is given, in which case, the Cluster will spawn as many instances as there are elements in the instance_params iterable. if num_instances is not None and < 1, the method will immediately return and log a warning.

  • instance_params (Optional[Iterable[Dict]]) – optional list of dictionaries of keyword arguments that will be passed to consecutive, spawned instances. The number of elements in the iterable determines the number of instances spawned, unless num_instances is given, in which case the latter takes precedence. In other words, if both num_instances and instance_params are provided, the number of instances spawned will be equal to num_instances and if there are too few elements in the instance_params iterable, it will results in an error.

  • network_addresses (Optional[List[str]]) – optional list of network addresses in case the Cluster is attached to VPN. If the list is not provided (or if the number of elements is less than the number of spawned instances), any instances for which the addresses have not been given, will be assigned an address automatically.

Return type

None

ServiceState

class yapapi.services.ServiceState(model=None, state_field='state', start_value=None)

State machine describing the state and lifecycle of a Service instance.

Network API

Network

class yapapi.network.Network(net_api, ip, owner_id, owner_ip=None, mask=None, gateway=None)

Describes a VPN created between the requestor and the provider nodes within Golem Network.

async classmethod create(net_api, ip, owner_id, owner_ip=None, mask=None, gateway=None)

Create a new VPN.

Parameters
  • net_api (Net) – the mid-level binding used directly to perform calls to the REST API.

  • ip (str) – the IP address of the network. May contain netmask, e.g. “192.168.0.0/24”

  • owner_id (str) – the node ID of the owner of this VPN (the requestor)

  • owner_ip (Optional[str]) – the desired IP address of the requestor node within the newly-created network

  • mask (Optional[str]) – Optional netmask (only if not provided within the ip argument)

  • gateway (Optional[str]) – Optional gateway address for the network

Return type

Network

property owner_ip: str

The IP address of the requestor node within the network.

Return type

str

property network_address: str

The network address of this network, without a netmask.

Return type

str

property netmask: str

The netmask of this network.

Return type

str

property gateway: Optional[str]

The gateway address within this network, if provided.

Return type

Optional[str]

property nodes_dict: Dict[str, str]

Mapping between the IP addresses and Node IDs of the nodes within this network.

Return type

Dict[str, str]

property network_id: str

The automatically-generated, unique ID of this VPN.

Return type

str

async add_owner_address(ip)

Assign the given IP address to the requestor in the network.

Parameters

ip (str) – the IP address to assign to the requestor node.

async add_node(node_id, ip=None)

Add a new node to the network.

Parameters
  • node_id (str) – Node ID within the Golem network of this VPN node.

  • ip (Optional[str]) – IP address to assign to this node.

Return type

Node

Node

class yapapi.network.Node(network, node_id, ip)

Describes a node in a VPN, mapping a Golem node id to an IP address.

network: yapapi.network.Network

The Network (the specific VPN) this node is part of.

node_id: str

Golem id of the node.

ip: str

IP address of this node in this particular VPN.

get_deploy_args()

Generate a dictionary of arguments that are required for the appropriate Deploy command of an exescript in order to pass the network configuration to the runtime on the provider’s end.

Return type

Dict

Exceptions

class yapapi.network.NetworkError

Exception raised by Network when an operation is not possible

Payload definition

Payload

class yapapi.payload.Payload(**kwargs)

Base class for descriptions of the payload required by the requestor.

example usage:

import asyncio

from dataclasses import dataclass
from yapapi.props.builder import DemandBuilder
from yapapi.props.base import prop, constraint
from yapapi.props import inf

from yapapi.payload import Payload

CUSTOM_RUNTIME_NAME = "my-runtime"
CUSTOM_PROPERTY = "golem.srv.app.myprop"


@dataclass
class MyPayload(Payload):
    myprop: str = prop(CUSTOM_PROPERTY, default="myvalue")
    runtime: str = constraint(inf.INF_RUNTIME_NAME, default=CUSTOM_RUNTIME_NAME)
    min_mem_gib: float = constraint(inf.INF_MEM, operator=">=", default=16)
    min_storage_gib: float = constraint(inf.INF_STORAGE, operator=">=", default=1024)


async def main():
    builder = DemandBuilder()
    payload = MyPayload(myprop="othervalue", min_mem_gib=32)
    await builder.decorate(payload)
    print(builder)

asyncio.run(main())

output:

{'properties': {'golem.srv.app.myprop': 'othervalue'}, 'constraints': ['(&(golem.runtime.name=my-runtime)
(golem.inf.mem.gib>=32)
(golem.inf.storage.gib>=1024))']}

Package

class yapapi.payload.package.Package

Description of a task package (e.g. a VM image) deployed on the provider nodes

vm.repo

async yapapi.payload.vm.repo(*, image_hash, image_url=None, min_mem_gib=0.5, min_storage_gib=2.0, min_cpu_threads=1, capabilities=None)

Build a reference to application package.

Parameters
  • image_hash (str) – hash of the package’s image

  • image_url (Optional[str]) – URL of the package’s image

  • min_mem_gib (float) – minimal memory required to execute application code

  • min_storage_gib (float) – minimal disk storage to execute tasks

  • min_cpu_threads (int) – minimal available logical CPU cores

  • capabilities (Optional[List[Literal[‘vpn’]]]) – an optional list of required vm capabilities

Return type

Package

Returns

the payload definition for the given VM image

example usage:

package = await vm.repo(
    # if we provide only the image hash, the image will be
    # automatically pulled from Golem's image repository
    image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376",
)

example usage with an explicit GVMI image URL (useful to host images outside the Golem repository):

package = await vm.repo(
    # we still need to provide the image's hash because
    # the image's integrity is validated by the runtime on the provider node
    #
    # the hash can be calculated by running `sha3sum -a 224 <image_filename.gvmi>`
    #
    image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376",

    # the URL can point to any publicly-available location on the web
    image_url="http://girepo.dev.golem.network:8000/docker-golem-hello-world-latest-779758b432.gvmi",
)

example usage with additional constraints:

package = await vm.repo(
    image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae",
    # only run on provider nodes that have more than 0.5gb of RAM available
    min_mem_gib=0.5,
    # only run on provider nodes that have more than 2gb of storage space available
    min_storage_gib=2.0,
    # only run on provider nodes which a certain number of CPU threads available
    min_cpu_threads=min_cpu_threads,
)

Execution control

WorkContext

class yapapi.WorkContext(activity, agreement, storage, emitter)

Provider node’s work context.

Used to schedule commands to be sent to the provider and enable other interactions between the requestor agent’s client code and the activity on provider’s end.

property id: str

Unique identifier for this work context.

Return type

str

property provider_name: Optional[str]

Return the name of the provider associated with this work context.

Return type

Optional[str]

property provider_id: str

Return the id of the provider associated with this work context.

Return type

str

new_script(timeout=None, wait_for_results=True)

Create an instance of Script attached to this WorkContext instance.

This is equivalent to calling Script(work_context). This method is intended to provide a direct link between the two object instances.

Return type

Script

async get_raw_usage()

Get the raw usage vector for the activity bound to this work context.

The value comes directly from the low level API and is not interpreted in any way.

Return type

ActivityUsage

async get_usage()

Get the current usage for the activity bound to this work context.

Return type

ActivityUsage

async get_raw_state()

Get the state activity bound to this work context.

The value comes directly from the low level API and is not interpreted in any way.

Return type

ActivityState

async get_cost()

Get the accumulated cost of the activity based on the reported usage.

Return type

Optional[float]

Script

class yapapi.script.Script(context, timeout=None, wait_for_results=True)

Represents a series of commands to be executed on a provider node.

New commands are added to the script either through its add() method or by calling one of the convenience methods provided (for example: run() or upload_json()). Adding a new command does not result in it being immediately executed. Once ready, a Script instance is meant to be yielded from a worker function (work generator pattern). Commands will be run in the order in which they were added to the script.

__init__(context, timeout=None, wait_for_results=True)

Initialize a Script

Parameters
  • context (WorkContext) – A yapapi.WorkContext that will be used to evaluate the script (i.e. to send commands to the provider)

  • timeout (Optional[timedelta]) – Time after which this script’s execution should be forcefully interrupted. The default value is None which means there’s no timeout set.

  • wait_for_results (bool) – Whether this script’s execution should block until its results are available. The default value is True.

property id: int

Return the ID of this Script instance.

IDs are provided by a global iterator and therefore are guaranteed to be unique during the program’s execution.

Return type

int

add(cmd)

Add a yapapi.script.command.Command to the Script

Return type

Awaitable[CommandExecuted]

deploy(**kwargs)

Schedule a Deploy command on the provider.

Return type

Awaitable[CommandExecuted]

start(*args)

Schedule a Start command on the provider.

Return type

Awaitable[CommandExecuted]

terminate()

Schedule a Terminate command on the provider.

Return type

Awaitable[CommandExecuted]

run(cmd, *args, env=None, stderr=None, stdout=None)

Schedule running a shell command on the provider.

Parameters
  • cmd (str) – command to run on the provider, e.g. /my/dir/run.sh

  • args (str) – command arguments, e.g. “input1.txt”, “output1.txt”

  • env (Optional[Dict[str, str]]) – optional dictionary with environment variables

  • stderr (Optional[CaptureContext]) – capture context to use for stderr

  • stdout (Optional[CaptureContext]) – capture context to use for stdout

Return type

Awaitable[CommandExecuted]

download_bytes(src_path, on_download, limit=1048576)

Schedule downloading a remote file from the provider as bytes.

Parameters
  • src_path (str) – remote (provider) source path

  • on_download (Callable[[bytes], Awaitable]) – the callable to run on the received data

  • limit (int) – limit of bytes to be downloaded (expected size)

Return type

Awaitable[CommandExecuted]

download_file(src_path, dst_path)

Schedule downloading a remote file from the provider.

Parameters
  • src_path (str) – remote (provider) source path

  • dst_path (str) – local (requestor) destination path

Return type

Awaitable[CommandExecuted]

download_json(src_path, on_download, limit=1048576)

Schedule downloading a remote file from the provider as JSON.

Parameters
  • src_path (str) – remote (provider) source path

  • on_download (Callable[[Any], Awaitable]) – the callable to run on the received data

  • limit (int) – limit of bytes to be downloaded (expected size)

Return type

Awaitable[CommandExecuted]

upload_bytes(data, dst_path)

Schedule sending bytes data to the provider.

Parameters
  • data (bytes) – bytes to send

  • dst_path (str) – remote (provider) destination path

Return type

Awaitable[CommandExecuted]

upload_file(src_path, dst_path)

Schedule sending a file to the provider.

Parameters
  • src_path (str) – local (requestor) source path

  • dst_path (str) – remote (provider) destination path

Return type

Awaitable[CommandExecuted]

upload_json(data, dst_path)

Schedule sending JSON data to the provider.

Parameters
  • data (dict) – dictionary representing JSON data to send

  • dst_path (str) – remote (provider) destination path

Return type

Awaitable[CommandExecuted]

Market strategies

class yapapi.strategy.MarketStrategy

Abstract market strategy.

acceptable_prop_value_range_overrides: Dict[str, yapapi.strategy.base.PropValueRange]

Optional overrides to the acceptable property value ranges.

property acceptable_prop_value_ranges: Dict[str, yapapi.strategy.base.PropValueRange]

The range of acceptable property values for negotiable properties.

Return type

Dict[str, PropValueRange]

async respond_to_provider_offer(our_demand, provider_offer)

Respond to the provider’s OfferProposal with acceptable values for negotiable properties.

Includes negotiation of the properties required for mid-agreement payments.

Return type

DemandBuilder

async decorate_demand(demand)

Optionally add relevant constraints to a Demand.

Return type

None

async invoice_accepted_amount(invoice)

Accept full invoice amount.

Return type

Decimal

abstract async score_offer(offer)

Score offer. Better offers should get higher scores.

Return type

float

async debit_note_accepted_amount(debit_note)

Accept full debit note amount.

Return type

Decimal

class yapapi.strategy.WrappingMarketStrategy(base_strategy)

Helper abstract class which allows other/user defined strategies to wrap some other strategies, without overriding the attributes (e.g. defaults) defined on the derived-from strategy.

WrappingMarketStrategy classes are unusable on their own and always have to wrap some base strategy.

By default all attributes and method calls are forwarded to the base_strategy.

__init__(base_strategy)
Parameters

base_strategy (BaseMarketStrategy) – the base strategy around which this strategy is wrapped

base_strategy: yapapi.strategy.base.BaseMarketStrategy

base strategy wrapped by this wrapper.

class yapapi.strategy.LeastExpensiveLinearPayuMS(expected_time_secs=60, max_fixed_price=Decimal('Infinity'), max_price_for=mappingproxy({}))

A strategy that scores offers according to cost for given computation time.

async score_offer(offer)

Score offer according to cost for expected computation time.

Return type

float

class yapapi.strategy.DecreaseScoreForUnconfirmedAgreement(base_strategy, factor)

A market strategy wrapper that modifies scoring of providers based on history of agreements.

It decreases the scores for providers that rejected our agreements.

The strategy keeps an internal list of providers who have rejected proposed agreements and removes them from this list when they accept one.

on_event(event)

Modify the internal _rejecting_providers list on AgreementConfirmed/Rejected.

This method needs to be added as an event consumer in yapapi.Golem.add_event_consumer().

Return type

None

class yapapi.strategy.PropValueRange(min=None, max=None)

Range definition for a negotiable property.

Used in yapapi.strategy.MarketStrategy.acceptable_prop_value_ranges()

__contains__(item)

Check if the value fits inside the range.

When max is None, any value above min fits. Whem min is None, any value below max fits. When both min and max are None, any value fits.

Return type

bool

clamp(item)

Return a value closest to the given one, within the acceptable range.

Parameters

item (float) – the value to clamp

Return type

float

Returns

clamped value

In case a range is defined with max < min (effectively making it an empty range), clamp raises a ValueError.

__init__(min=None, max=None)
class yapapi.strategy.DummyMS(max_fixed_price=Decimal('0.05'), max_price_for=mappingproxy({}), activity=None)

A default market strategy implementation.

[ DEPRECATED, use LeastExpensiveLinearPayuMS instead ]

Its score_offer() method returns SCORE_NEUTRAL for every offer with prices that do not exceed maximum prices specified for each counter. For other offers, returns SCORE_REJECTED.

Events

Objects representing events in a Golem computation.

Everytime something important happens, an event is emitted. Emitted events are passed to all current event consumers, set either in yapapi.Golem.__init__() or via yapapi.Golem.add_event_consumer().

Events are considered a semi-experimental feature:

  • The backward compatibility of subsequent future updates is not guaranteed, the interface might change between major releases without prior deprecation.

  • Some parts are not documented and should not be considered a public interface.

Every event is described by a set of attributes that can be divided into three groups:

  • Attributes common to all events, documented on yapapi.events.Event.

  • Internal parts of yapapi that define the context of the event (details in the next section).

  • Additional event-specific information, e.g. the reason of the agreement termination for the AgreementTerminated event, described along the particular event classes.

Events should be consumed in a strict read_only mode: event objects are shared between all event consumers, and their attributes are used internally by the Golem engine, so any modification may have unexpected side effects.

Attributes shared by various events

Name

Value

Description

job

yapapi.engine.Job

The highest-level unit of a Golem computation, corresponding to a single call to yapapi.Golem.execute_tasks() or yapapi.Golem.run_service()

subscription

yapapi.rest.market.Subscription

Object responsible for gathering offers from the market

proposal

yapapi.rest.market.OfferProposal

A single offer from the market

agreement

yapapi.rest.market.Agreement

An agreement between provider and requestor (not necessarly confirmed)

activity

yapapi.rest.activity.Activity

Object corresponding to a provider-side activity within an agreement

script

yapapi.script.Script

[Class documentation available]

command

yapapi.script.command.Command

A single command to be executed within a Script

task

yapapi.executor.task.Task

[Class documentation available]

service

yapapi.services.Service

[Class documentation available]

debit_note

yapapi.rest.payment.DebitNote

A single debit note sent by the provider

invoice

yapapi.rest.payment.Invoice

A final invoice sent by the provider

Events inheritance tree

  • Only leaf events are ever emitted, other events (named *Event) are abstract classes

  • Every abstract class has one more yapapi object attached then the parent, e.g.

Event
    JobEvent
        SubscriptionFailed
        SubscriptionEvent
            SubscriptionCreated
            CollectFailed
        ProposalEvent
            ProposalReceived
            ProposalRejected
            ProposalResponded
            ProposalConfirmed
            ProposalFailed
        NoProposalsConfirmed
        JobStarted
        JobFinished
        AgreementEvent
            AgreementCreated
            AgreementConfirmed
            AgreementRejected
            AgreementTerminated
            ActivityCreateFailed
            WorkerStarted
            ActivityEvent
                ActivityCreated
                TaskEvent
                    TaskStarted
                    TaskFinished
                    TaskAccepted
                    TaskRejected
                ServiceEvent
                    ServiceStateChanged
                    ServiceFinished
                ScriptEvent
                    ScriptSent
                    CommandEvent
                        CommandStarted
                        CommandStdOut
                        CommandStdErr
                        CommandExecuted
                        DownloadStarted
                        DownloadFinished
                    GettingResults
                    ScriptFinished
                WorkerFinished
            InvoiceEvent
                InvoiceReceived
                InvoiceAccepted
            DebitNoteEvent
                DebitNoteReceived
                DebitNoteAccepted
            PaymentFailed
    ExecutionInterrupted
    ShutdownFinished

Custom events

Apart from consuming events emitted by the Python API’s internal components, application authors may wish to define their own, custom event classes. That way, Golem’s event system can be easily extended with additional, custom triggers while keeping the consumer logic uniform for both Golem’s own events and the custom ones.

Additionally, such custom events may be used to create pieces of useful, reusable components while keeping the the emitting and consuming logic separate at the same time. As an example, one could provide a custom MarketStrategy that consumes an event such as e.g.: ActivityEvaluated and have many different applications utilize this strategy but evaluate activities in different ways.

Example usage

Declare the class:

from yapapi.events import ActivityEvent
import attr

@attr.s(auto_attribs=True, repr=False)
class ActivityEvaluated(ActivityEvent):
    activity_score: float

Emit the event e.g. in the worker() function of the Task API:

async def worker(ctx: WorkContext, tasks):
    ... #  whatever
    activity_score = await score_activity(ctx)  # Some app-specific logic
    ctx.emit(ActivityEvaluated, activity_score=activity_score)

And consume the event anywhere you want (e.g. in a MarketStrategy method):

def event_consumer(event):
    if isinstance(event, ActivityEvaluated):
        if event.activity_score < 7:
            print(f"Oh no! Activity {event.activity.id} is scored below 7!")

golem.add_event_consumer(event_consumer)

List of event classes

class yapapi.events.Event(*, exc_info=None)

An abstract base class for all types of events.

exc_info: Optional[Tuple[Type[BaseException], BaseException, Optional[types.TracebackType]]]

Tuple containing exception info as returned by sys.exc_info(), if applicable.

timestamp: datetime.datetime

Event creation time

property exception: Optional[BaseException]

Exception associated with this event or None

Return type

Optional[BaseException]

class yapapi.events.JobEvent(job, *, exc_info=None)
class yapapi.events.SubscriptionEvent(job, subscription, *, exc_info=None)
class yapapi.events.ProposalEvent(job, proposal, *, exc_info=None)
class yapapi.events.AgreementEvent(job, agreement, *, exc_info=None)
class yapapi.events.ActivityEvent(job, agreement, activity, *, exc_info=None)
class yapapi.events.TaskEvent(job, agreement, activity, task, *, exc_info=None)
class yapapi.events.ServiceEvent(job, agreement, activity, service, *, exc_info=None)
class yapapi.events.ScriptEvent(job, agreement, activity, script, *, exc_info=None)
class yapapi.events.CommandEvent(job, agreement, activity, script, command, *, exc_info=None)
class yapapi.events.InvoiceEvent(job, agreement, invoice, *, exc_info=None)
class yapapi.events.DebitNoteEvent(job, agreement, debit_note, *, exc_info=None)
class yapapi.events.JobStarted(job, *, exc_info=None)
class yapapi.events.JobFinished(job, *, exc_info=None)

job is done, succeded if exception is None failed otherwise.

class yapapi.events.SubscriptionCreated(job, subscription, *, exc_info=None)
class yapapi.events.SubscriptionFailed(job, reason, *, exc_info=None)
class yapapi.events.CollectFailed(job, subscription, reason, *, exc_info=None)
class yapapi.events.ProposalReceived(job, proposal, *, exc_info=None)
class yapapi.events.ProposalRejected(job, proposal, reason=None, *, exc_info=None)

We decided to reject provider’s proposal because of a reason

class yapapi.events.ProposalResponded(job, proposal, *, exc_info=None)
class yapapi.events.ProposalConfirmed(job, proposal, *, exc_info=None)
class yapapi.events.ProposalFailed(job, proposal, *, exc_info=None)
class yapapi.events.NoProposalsConfirmed(job, timeout, *, exc_info=None)

We didn’t confirm any proposal for a period of timeout

class yapapi.events.AgreementCreated(job, agreement, *, exc_info=None)
class yapapi.events.AgreementConfirmed(job, agreement, *, exc_info=None)
class yapapi.events.AgreementRejected(job, agreement, *, exc_info=None)
class yapapi.events.AgreementTerminated(job, agreement, reason, *, exc_info=None)
class yapapi.events.DebitNoteReceived(job, agreement, debit_note, *, exc_info=None)
class yapapi.events.DebitNoteAccepted(job, agreement, debit_note, *, exc_info=None)
class yapapi.events.PaymentFailed(job, agreement, *, exc_info=None)
class yapapi.events.InvoiceReceived(job, agreement, invoice, *, exc_info=None)
class yapapi.events.InvoiceAccepted(job, agreement, invoice, *, exc_info=None)
class yapapi.events.WorkerStarted(job, agreement, *, exc_info=None)
class yapapi.events.ActivityCreated(job, agreement, activity, *, exc_info=None)
class yapapi.events.ActivityCreateFailed(job, agreement, *, exc_info=None)
class yapapi.events.TaskStarted(job, agreement, activity, task, *, exc_info=None)
class yapapi.events.TaskFinished(job, agreement, activity, task, *, exc_info=None)
class yapapi.events.ServiceStateChanged(job, agreement, activity, service, old, new, *, exc_info=None)
class yapapi.events.ServiceFinished(job, agreement, activity, service, *, exc_info=None)
class yapapi.events.WorkerFinished(job, agreement, activity, *, exc_info=None)
class yapapi.events.ScriptSent(job, agreement, activity, script, *, exc_info=None)
class yapapi.events.GettingResults(job, agreement, activity, script, *, exc_info=None)
class yapapi.events.ScriptFinished(job, agreement, activity, script, *, exc_info=None)
class yapapi.events.CommandExecuted(job, agreement, activity, script, command, success, message, stdout=None, stderr=None, *, exc_info=None)
class yapapi.events.CommandStarted(job, agreement, activity, script, command, *, exc_info=None)
class yapapi.events.CommandStdOut(job, agreement, activity, script, command, output, *, exc_info=None)
class yapapi.events.CommandStdErr(job, agreement, activity, script, command, output, *, exc_info=None)
class yapapi.events.TaskAccepted(job, agreement, activity, task, *, exc_info=None)
class yapapi.events.TaskRejected(job, agreement, activity, task, reason, *, exc_info=None)
class yapapi.events.DownloadStarted(job, agreement, activity, script, command, *, exc_info=None)
class yapapi.events.DownloadFinished(job, agreement, activity, script, command, *, exc_info=None)
class yapapi.events.ShutdownFinished(*, exc_info=None)

Golem completed the shutdown sequence and is no longer operative

class yapapi.events.ExecutionInterrupted(*, exc_info=None)

Golem was stopped by an unhandled exception in code not managed by yapapi

Exceptions

exception yapapi.NoPaymentAccountError(required_driver, required_network)

The error raised if no payment account for the required driver/network is available.

exception yapapi.rest.activity.BatchTimeoutError

An exception that indicates that an execution of a batch of commands timed out.

Logging

yapapi.log.enable_default_logger(format_='[%(asctime)s %(levelname)s %(name)s] %(message)s', log_file=None, debug_activity_api=False, debug_market_api=False, debug_payment_api=False, debug_net_api=False)

Enable the default logger that logs messages to stderr with level INFO.

If log_file is specified, the logger with output messages with level DEBUG to the given file.

class yapapi.log.SummaryLogger(wrapped_emitter=None)

Aggregates information from computation events to provide a high-level summary.

The logger’s log() method can be used as event_consumer callback in Golem initialization. It will aggregate the events generated and output some summary information.

The optional wrapped_emitter argument can be used for chaining event emitters: each event logged with log() is first passed to wrapped_emitter.

For example, with the following setup, each event will be logged by log_event_repr, and additionally, certain events will cause summary messages to be logged.

detailed_logger = log_event_repr
summary_logger = SummaryLogger(wrapped_emitter=detailed_logger).log
golem = Golem(..., event_consumer=summary_logger)
log(event)

Register an event.

Return type

None

yapapi.log.log_summary(wrapped_emitter=None)

Output a summary of computation.

This is a utility function that creates a SummaryLogger instance wrapping an optional wrapped_emitter and returns its log() method.

See the documentation of SummaryLogger for more information.

Utils

yapapi.windows_event_loop_fix()

Set up asyncio to use ProactorEventLoop implementation for new event loops on Windows.

This work-around is only needed for Python 3.6 and 3.7. With Python 3.8, ProactorEventLoop is already the default on Windows.

yapapi.get_version()
Return type

str

Returns

the version of the yapapi library package

Yapapi Contrib

Useful, reusable pieces of code built on top of yapapi.

This part of yapapi repository serves to provide requestor agent application authors with reusable components that we or third-party developers found useful while working on examples and apps that use our Python API.

They’re not strictly part of the high level API library itself and while we do intend to keep them in sync with the subsequent releases, they should be treated as experimental and their interface may change without a proper deprecation process.

Provider Filter

Market strategy wrapper that enables easy exclusion of offers from certain providers using a simple boolean condition, while preserving correct scoring of the remaining offers by the base strategy.

class yapapi.contrib.strategy.ProviderFilter(base_strategy, is_allowed)

ProviderFilter - extend a market strategy with a layer that excludes offers from certain issuers

Parameters
  • base_strategy (BaseMarketStrategy) – a market strategy that will be used to score offers from allowed providers

  • is_allowed (Union[Callable[[str], bool], Callable[[str], Awaitable[bool]]]) – a callable that accepts provider_id as an argument and returns either a boolean, or a boolean-returning awaitable, determining if offers from this provider should be considered (that is: scored by the base_strategy)

Example 1. Block selected providers:

bad_providers = ['bad_provider_1', 'bad_provider_2']
base_strategy = SomeStrategy(...)
strategy = ProviderFilter(base_strategy, lambda provider_id: provider_id not in bad_providers)

Example 2. Select providers using a database table:

#   create an async database connection
#   (sync would also work, but could hurt `yapapi` overall performance)
async_conn = ...

async def is_allowed(provider_id):
    result = await async_conn.execute("SELECT 1 FROM allowed_providers WHERE provider_id = ?", provider_id)
    return bool(result.fetchall())

base_strategy = SomeStrategy()
strategy = ProviderFilter(base_strategy, is_allowed)

Example 3. Use the default strategy, but disable every provider that fails to create an activity:

from yapapi import events

bad_providers = set()

def denying_event_consumer(event: events.Event):
    if isinstance(event, events.ActivityCreateFailed):
        bad_providers.add(event.provider_id)

golem = Golem(...)
golem.strategy = ProviderFilter(golem.strategy, lambda provider_id: provider_id not in bad_providers)
await golem.add_event_consumer(denying_event_consumer)

async with golem:
    ...

#   NOTE: this will currently work only for **new** offers from the provider, because old offers are already
#   scored, this should improve in https://github.com/golemfactory/yapapi/issues/820

Local HTTP Proxy

A local HTTP proxy that enables easy connections to any VPN-enabled, HTTP-based services launched on Golem providers using yapapi’s Services API.

For usage in a complete requestor agent app, see the http-proxy and webapp examples in the yapapi repository.

class yapapi.contrib.service.http_proxy.LocalHttpProxy(cluster, port)

Runs a local aiohttp server and processes requests through instances of HttpProxyService.

Using yapapi’s Network API (create_network()), execution units on the provider nodes can be connected to a virtual network which can then be used both for communication between those nodes (through virtual network interfaces within VMs) and between the specific nodes and the requestor agent (through a websocket endpoint in the yagna daemon’s REST API).

LocalHttpProxy and HttpProxyService use the latter to enable HTTP connections to be routed from a local port on the requestor’s host, to a specified TCP port within the VM on the provider’s end.

Example usage:

class HttpService(HttpProxyService):
    ...

cluster = await golem.run_service(
    HttpService,
    network=network,
    instance_params=[{"remote_port": 80}],  # optional, 80 by default
)

proxy = LocalHttpProxy(cluster, 8080)
await proxy.run()

... # requests made to http://localhost:8080 are routed to port 80 within the VM

await proxy.stop()
cluster.stop()
__init__(cluster, port)

Initialize the local HTTP proxy

Parameters
async run()

run a local HTTP server, listening on the specified port and passing subsequent requests to the handle_request() of the specified cluster in a round-robin fashion

class yapapi.contrib.service.http_proxy.HttpProxyService(remote_port=80, remote_host=None, response_timeout=10.0)

Base class for services connected to the LocalHttpProxy.

Implements the interface used by LocalHttpProxy to route HTTP requests to the instances of an HTTP service running on providers.

__init__(remote_port=80, remote_host=None, response_timeout=10.0)

Initialize the HTTP proxy service

Parameters
  • remote_port (int) – the port on which the service on the provider’s end listens

  • remote_host (Optional[str]) – optional hostname to be used in the headers passed to the remote HTTP server. If not provided, the Host header of the incoming http requests will be passed.

  • response_timeout (float) – the timeout for the requests made to the remote server

async handle_request(request)

handle a single request coming from a LocalHttpProxy server by passing it to the HTTP service on the provider’s end through the VPN

Parameters

request (Request) – an aiohttp.web.Request

Return type

Response

Returns

an aiohttp.web.Response