Golem Python API Reference
Golem
- class yapapi.Golem(*, budget, strategy=None, subnet_tag=None, driver=None, payment_driver=None, network=None, payment_network=None, event_consumer=None, stream_output=False, app_key=None)
The main entrypoint of Golem's high-level API.
Its principal role is providing an interface to run the requestor’s payload using one of two modes of operation - executing tasks and running services.
The first one, available through
execute_tasks()
, instructsGolem
to take a sequence of tasks that the user wishes to compute on Golem and distributes those among the providers.The second one, invoked with
run_service()
, makesGolem
spawn a certain number of instances of a service based on a single service specification (a specialized implementation inheriting fromService
).While the two modes are not necessarily completely disjoint - in that we can create a service that exists to process a certain number of computations and, similarly, we can use the task model to run some service - the main difference lies in the lifetime of such a job.
Whereas a task-based job exists for the purpose of computing the specific sequence of tasks and is done once the whole sequence has been processed, the service-based job is created for a potentially indefinite period and the services spawned within it are kept alive for as long as they’re needed.
Additionally, the service interface provides a way to easily define handlers for certain, discrete phases of a lifetime of each service instance - startup, running and shutdown.
Internally,
Golem
’s job includes running the engine which takes care of first finding the providers interested in the jobs the requestors want to execute, then negotiating agreements with them and facilitating the execution of those jobs and lastly, processing payments. For this reason, it’s usually good to have just one instance ofGolem
operative at any given time.- __init__(*, budget, strategy=None, subnet_tag=None, driver=None, payment_driver=None, network=None, payment_network=None, event_consumer=None, stream_output=False, app_key=None)
Initialize Golem engine.
- Parameters
budget (
Union
[float
,Decimal
]) – maximum budget for paymentsstrategy (
Optional
[BaseMarketStrategy
]) – market strategy used to select providers from the market (e.g.yapapi.strategy.LeastExpensiveLinearPayuMS
oryapapi.strategy.DummyMS
)subnet_tag (
Optional
[str
]) – use only providers in the subnet with the subnet_tag name. Uses YAGNA_SUBNET environment variable, defaults to Nonedriver (
Optional
[str
]) – deprecated, please use payment_driver insteadpayment_driver (
Optional
[str
]) – name of the payment driver to use. Uses YAGNA_PAYMENT_DRIVER environment variable, defaults to erc20. Only payment platforms with the specified driver will be usednetwork (
Optional
[str
]) – deprecated, please use payment_network insteadpayment_network (
Optional
[str
]) – name of the network to use. Uses YAGNA_NETWORK environment variable, defaults to rinkeby. Only payment platforms with the specified network will be usedevent_consumer (
Optional
[Callable
[[Event
],None
]]) – a callable that processes events related to the computation; by default it is a function that logs all eventsstream_output (
bool
) – stream computation output from providersapp_key (
Optional
[str
]) – optional Yagna application key. If not provided, the default is to get the value from YAGNA_APPKEY environment variable
- add_event_consumer(event_consumer, event_classes_or_names=(<class 'yapapi.events.Event'>, ))
Initialize another event_consumer, working just like the event_consumer passed to
Golem.__init__()
- Parameters
event_consumer (
Callable
[[Event
],None
]) – A callable that will be executed on every event.event_classes_or_names (
Iterable
[Union
[Type
[Event
],str
]]) – An iterable defining classes of events that should be passed to this event_consumer. Both classes and class names are accepted (in the latter case classes must be available in the yapapi.events namespace). If this argument is omitted, all events inheriting fromyapapi.events.Event
(i.e. all currently implemented events) will be passed to the event_consumer.
Example usages:
def event_consumer(event: "yapapi.events.Event"): print(f"Got an event! {type(event).__name__}") golem.add_event_consumer(event_consumer)
def event_consumer(event: "yapapi.events.AgreementConfirmed"): provider_name = event.agreement.details.provider_node_info.name print(f"We're trading with {provider_name}! Nice!") golem.add_event_consumer(event_consumer, ["AgreementConfirmed"])
- async start()
Start the Golem engine in non-contextmanager mode.
The default way of using Golem:
async with Golem(...) as golem: # ... work with golem
Is roughly equivalent to:
golem = Golem(...) try: await golem.start() # ... work with golem finally: await golem.stop()
- A repeated call to
Golem.start()
: If Golem is already starting, or started and wasn’t stopped - will be ignored (and harmless)
If Golem was stopped - will initialize a new engine that knows nothing about the previous operations
- Return type
None
- A repeated call to
- async stop()
Stop the Golem engine after it was started in non-contextmanager mode.
Details:
Golem.start()
- Return type
None
- async execute_tasks(worker, data, payload, max_workers=None, timeout=None, job_id=None, implicit_init=True)
Submit a sequence of tasks to be executed on providers.
Internally, this method creates an instance of
yapapi.executor.Executor
and calls itssubmit()
method with given worker function and sequence of tasks.- Parameters
worker (
Callable
[[WorkContext
,AsyncIterator
[Task
[TypeVar
(D
),TypeVar
(R
)]]],AsyncGenerator
[Script
,Awaitable
[List
[CommandEvent
]]]]) – an async generator that takes aWorkContext
object and a sequence of tasks, and generates as sequence of scripts to be executed on providers in order to compute given tasksdata (
Union
[AsyncIterator
[Task
[TypeVar
(D
),TypeVar
(R
)]],Iterable
[Task
[TypeVar
(D
),TypeVar
(R
)]]]) – an iterable or an async generator ofTask
objects to be computed on providerspayload (
Payload
) – specification of the payload that needs to be deployed on providers (for example, a VM runtime package) in order to compute the tasks, passed to the createdExecutor
instancemax_workers (
Optional
[int
]) – maximum number of concurrent workers, passed to theExecutor
instancetimeout (
Optional
[timedelta
]) – timeout for computing all tasks, passed to theExecutor
instancejob_id (
Optional
[str
]) – an optional string to identify the job created by this method. Passed as the value of the id parameter toyapapi.engine.Job
.implicit_init (
bool
) – True ->deploy()
andstart()
will be called internally by theExecutor
. False -> those calls must be in the worker function
- Return type
AsyncIterator
[Task
[TypeVar
(D
),TypeVar
(R
)]]- Returns
an async iterator that yields completed Task objects
example usage:
async def worker(context: WorkContext, tasks: AsyncIterable[Task]): async for task in tasks: context.run("/bin/sh", "-c", "date") future_results = yield context.commit() results = await future_results task.accept_result(result=results[-1]) package = await vm.repo( image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376", ) async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem: async for completed in golem.execute_tasks(worker, [Task(data=None)], payload=package): print(completed.result.stdout)
- async run_service(service_class, num_instances=None, instance_params=None, payload=None, expiration=None, respawn_unstarted_instances=True, network=None, network_addresses=None)
Run a number of instances of a service represented by a given
Service
subclass.- Parameters
service_class (
Type
[TypeVar
(ServiceType
, bound=Service
)]) – a subclass ofService
that represents the service to be runnum_instances (
Optional
[int
]) – optional number of service instances to run. Defaults to a single instance, unless instance_params is given, in which case, theCluster
will be created with as many instances as there are elements in the instance_params iterable. if num_instances is set to < 1, theCluster
will still be created but no instances will be spawned within it.instance_params (
Optional
[Iterable
[Dict
]]) – optional list of dictionaries of keyword arguments that will be passed to consecutive, spawned instances. The number of elements in the iterable determines the number of instances spawned, unless num_instances is given, in which case the latter takes precedence. In other words, if both num_instances and instance_params are provided, theCluster
will be created with the number of instances determined by num_instances and if there are too few elements in the instance_params iterable, it will results in an error.payload (
Optional
[Payload
]) – optional runtime definition for the service; if not provided, the payload specified by theget_payload()
method of service_class is usedexpiration (
Optional
[datetime
]) – optional expiration datetime for the servicerespawn_unstarted_instances – if an instance fails in the starting state, should the returned
Cluster
try to spawn another instancenetwork (
Optional
[Network
]) – optionalNetwork
, representing a VPN to attach thisCluster
’s instances tonetwork_addresses (
Optional
[List
[str
]]) – optional list of addresses to assign to consecutive spawned instances. If there are too few addresses given in the network_addresses iterable to satisfy all spawned instances, the rest (or all when the list is empty or not provided at all) of the addresses will be assigned automatically. Requires the network argument to be provided at the same time.
example usage:
DATE_OUTPUT_PATH = "/golem/work/date.txt" REFRESH_INTERVAL_SEC = 5 class DateService(Service): @staticmethod async def get_payload(): return await vm.repo( image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376", ) async def start(self): async for script in super().start(): yield script # every `DATE_POLL_INTERVAL` write output of `date` to `DATE_OUTPUT_PATH` script = self._ctx.new_script() script.run( "/bin/sh", "-c", f"while true; do date > {DATE_OUTPUT_PATH}; sleep {REFRESH_INTERVAL_SEC}; done &", ) yield script async def run(self): while True: await asyncio.sleep(REFRESH_INTERVAL_SEC) script = self._ctx.new_script() future_result = script.run( "/bin/sh", "-c", f"cat {DATE_OUTPUT_PATH}", ) yield script result = (await future_result).stdout print(result.strip() if result else "") async def main(): async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem: cluster = await golem.run_service(DateService, num_instances=1) start_time = datetime.now() while datetime.now() < start_time + timedelta(minutes=1): for num, instance in enumerate(cluster.instances): print(f"Instance {num} is {instance.state.value} on {instance.provider_name}") await asyncio.sleep(REFRESH_INTERVAL_SEC)
- async create_network(ip, owner_ip=None, mask=None, gateway=None)
Create a VPN inside Golem network.
Requires yagna >= 0.8
- Parameters
ip (
str
) – the IP address of the network. May contain netmask, e.g. “192.168.0.0/24”owner_ip (
Optional
[str
]) – the desired IP address of the requestor node within the newly-created Networkmask (
Optional
[str
]) – Optional netmask (only if not provided within the ip argument)gateway (
Optional
[str
]) – Optional gateway address for the network
- Return type
Task API
Task
- class yapapi.Task(data)
One computation unit.
Represents one computation unit that will be run on the provider (e.g. rendering of one frame of an animation).
- __init__(data)
Create a new
Task
object.- Parameters
data (
TypeVar
(TaskData
)) – contains information needed to prepare command list for the provider
- property running_time: Optional[datetime.timedelta]
Return the running time of the task (if in progress) or time it took to complete it.
- Return type
Optional
[timedelta
]
- accept_result(result=None)
Accept the result of this task.
Must be called when the result is correct to mark this task as completed.
- Parameters
result (
Optional
[TypeVar
(TaskResult
)]) – task computation result (optional)- Return type
None
- reject_result(reason=None, retry=False)
Reject the result of this task.
Must be called when the result is not correct to indicate that the task should be retried.
- Parameters
reason (
Optional
[str
]) – task rejection description (optional)- Return type
None
Service API
Service
- class yapapi.services.Service
Base class for service specifications.
To be extended by application developers to define their own, specialized Service specifications.
- property id: str
Return the unique id of this service instance.
- Return type
str
- property provider_name: Optional[str]
Return the name of the provider that runs this service instance.
- Return type
Optional
[str
]
- property network: Optional[yapapi.network.Network]
Return the
Network
to which this instance belongs (if any)- Return type
Optional
[Network
]
- property network_node: Optional[yapapi.network.Node]
Return the network
Node
record associated with this instance.- Return type
Optional
[Node
]
- async send_message(message=None)
Send a control message to this instance.
- send_message_nowait(message=None)
Send a control message to this instance without blocking.
May raise
asyncio.QueueFull
if the channel for sending control messages is full.
- async receive_message()
Wait for a control message sent to this instance.
- Return type
ServiceSignal
- receive_message_nowait()
Retrieve a control message sent to this instance.
Return None if no message is available.
- Return type
Optional
[ServiceSignal
]
- async static get_payload()
Return the payload (runtime) definition for this service.
To be overridden by the author of a specific
Service
class.If
get_payload()
is not implemented, the payload will need to be provided in therun_service()
call.- Return type
Optional
[Payload
]
- async start()
Implement the handler for the starting state of the service.
To be overridden by the author of a specific
Service
class.Should perform the minimum set of operations after which the instance of a service can be treated as “started”, or, in other words, ready to receive service requests. It’s up to the developer of the specific
Service
class to decide what exact operations constitute a service startup. In the most common scenariodeploy()
andstart()
are required, check the Default implementation section for more details.As a handler implementing the work generator pattern, it’s expected to be a generator that yields
Script
(generated using the service’s instance of theWorkContext
-self._ctx
) that are then dispatched to the activity by the engine.Results of those batches can then be retrieved by awaiting the values captured from yield statements.
A clean exit from a handler function triggers the engine to transition the state of the instance to the next stage in service’s lifecycle - in this case, to running.
On the other hand, any unhandled exception will cause the instance to be either retried on another provider node, if the
Cluster
’srespawn_unstarted_instances
argument is set to True inrun_service()
, which is also the default behavior, or altogether terminated, ifrespawn_unstarted_instances
is set to False.Example:
async def start(self): s = self._ctx.new_script() # deploy the exe-unit s.deploy(**self.get_deploy_args()) # start the exe-unit's container s.start() # start some service process within the container s.run("/golem/run/service_ctl", "--start") # send the batch to the provider yield s
### Default implementation
The default implementation assumes that, in order to accept commands, the runtime needs to be first deployed using the
deploy()
command, which is analogous to creation of a container corresponding with the desired payload, and then started using thestart()
command, actually launching the process that runs the aforementioned container.Additionally, it also assumes that the exe-unit doesn’t need any additional parameters in its
start()
call (e.g. for the VM runtime, all the required parameters are already passed as part of the agreement between the requestor and the provider), and parameters passed todeploy()
are returned byService.get_deploy_args()
method.Therefore, this default implementation performs the minimum required for a VM payload to start responding to run commands. If your service requires any additional operations - you’ll need to override this method (possibly first yielding from the parent - super().start() - generator) to add appropriate preparatory steps.
In case of runtimes other than VM, deploy and/or start might be optional or altogether disallowed, or they may take some parameters. It is up to the author of the specific Service implementation that uses such a payload to adjust this method accordingly based on the requirements for the given runtime/exe-unit type.
- Return type
AsyncGenerator
[Script
,Awaitable
[List
[CommandEvent
]]]
- async run()
Implement the handler for the running state of the service.
To be overridden by the author of a specific
Service
class.Should contain any operations needed to ensure continuous operation of a service.
As a handler implementing the work generator pattern, it’s expected to be a generator that yields
Script
(generated using the service’s instance of theWorkContext
-self._ctx
) that are then dispatched to the activity by the engine.Results of those batches can then be retrieved by awaiting the values captured from yield statements.
A clean exit from a handler function triggers the engine to transition the state of the instance to the next stage in service’s lifecycle - in this case, to stopping.
Any unhandled exception will cause the instance to be terminated.
Example:
async def run(self): while True: script = self._ctx.new_script() stats_results = script.run(self.SIMPLE_SERVICE, "--stats") yield script stats = (await stats_results).stdout.strip() print(f"stats: {stats}")
Default implementation
Because the nature of the operations required during the “running” state depends directly on the specifics of a given
Service
and because it’s entirely plausible for a service not to require any direct interaction with the exe-unit (runtime) from the requestor’s end after the service has been started, the default is to just wait indefinitely without producing any batches.- Return type
AsyncGenerator
[Script
,Awaitable
[List
[CommandEvent
]]]
- async shutdown()
Implement the handler for the stopping state of the service.
To be overridden by the author of a specific
Service
class.Should contain any operations that the requestor needs to ensure the instance is correctly and gracefully shut-down - e.g. that its final state is retrieved.
As a handler implementing the work generator pattern, it’s expected to be a generator that yields
Script
(generated using the service’s instance of theWorkContext
-self._ctx
) that are then dispatched to the activity by the engine.Results of those batches can then be retrieved by awaiting the values captured from yield statements.
Finishing the execution of this handler will trigger termination of this instance.
This handler will only be called if the activity running the service is still available. If the activity has already been deemed terminated or if the connection with the provider has been lost, the service will transition to the terminated state and the shutdown handler won’t be run.
Example:
async def shutdown(self): self._ctx.run("/golem/run/dump_state") self._ctx.download_file("/golem/output/state", "/some/local/path/state") self._ctx.terminate() yield self._ctx.commit()
Default implementation
By default, the activity is just sent a terminate command. Whether it’s absolutely required or not, again, depends on the implementation of the given runtime.
- Return type
AsyncGenerator
[Script
,Awaitable
[List
[CommandEvent
]]]
- async reset()
Reset the service to the initial state.
This method is called internally when the service is restarted in
yapapi.services.ServiceRunner
, so it is not necessary for services that are never restarted (note thatrun_service()
by default restarts services that didn’t start properly).Handlers of a restarted service are called more then once - all of the cleanup necessary between calls should be implemented here. E.g. if we initialize a counter in
Service.__init__()
and increment it inService.start()
, we might want to reset it here to the initial value.Target implementation (0.10.0 and up) will raise NotImplementedError. Current implementation only warns about this future change.
- Return type
None
- property is_available
Return True iff this instance is available (that is, starting, running or stopping).
- property state
Return the current state of this instance.
Cluster
- class yapapi.services.Cluster(engine, service_class, payload, expiration=None, respawn_unstarted_instances=True, network=None)
Golem’s sub-engine used to spawn and control instances of a single
Service
.- property expiration: datetime.datetime
Return the expiration datetime for agreements related to services in this
Cluster
.- Return type
datetime
- property payload: yapapi.payload.Payload
Return the service runtime definition for this
Cluster
.- Return type
- property service_class: Type[yapapi.services.service.ServiceType]
Return the class instantiated by all service instances in this
Cluster
.- Return type
Type
[TypeVar
(ServiceType
, bound=Service
)]
- property network: Optional[yapapi.network.Network]
Return the
Network
record associated with the VPN used by thisCluster
.- Return type
Optional
[Network
]
- property instances: List[yapapi.services.service.ServiceType]
Return the list of service instances in this
Cluster
.- Return type
List
[TypeVar
(ServiceType
, bound=Service
)]
- spawn_instances(num_instances=None, instance_params=None, network_addresses=None)
Spawn new instances within this
Cluster
.- Parameters
num_instances (
Optional
[int
]) – optional number of service instances to run. Defaults to a single instance, unless instance_params is given, in which case, theCluster
will spawn as many instances as there are elements in the instance_params iterable. if num_instances is not None and < 1, the method will immediately return and log a warning.instance_params (
Optional
[Iterable
[Dict
]]) – optional list of dictionaries of keyword arguments that will be passed to consecutive, spawned instances. The number of elements in the iterable determines the number of instances spawned, unless num_instances is given, in which case the latter takes precedence. In other words, if both num_instances and instance_params are provided, the number of instances spawned will be equal to num_instances and if there are too few elements in the instance_params iterable, it will results in an error.network_addresses (
Optional
[List
[str
]]) – optional list of network addresses in case theCluster
is attached to VPN. If the list is not provided (or if the number of elements is less than the number of spawned instances), any instances for which the addresses have not been given, will be assigned an address automatically.
- Return type
None
ServiceState
Network API
Network
- class yapapi.network.Network(net_api, ip, owner_id, owner_ip=None, mask=None, gateway=None)
Describes a VPN created between the requestor and the provider nodes within Golem Network.
- async classmethod create(net_api, ip, owner_id, owner_ip=None, mask=None, gateway=None)
Create a new VPN.
- Parameters
net_api (
Net
) – the mid-level binding used directly to perform calls to the REST API.ip (
str
) – the IP address of the network. May contain netmask, e.g. “192.168.0.0/24”owner_id (
str
) – the node ID of the owner of this VPN (the requestor)owner_ip (
Optional
[str
]) – the desired IP address of the requestor node within the newly-created networkmask (
Optional
[str
]) – Optional netmask (only if not provided within the ip argument)gateway (
Optional
[str
]) – Optional gateway address for the network
- Return type
- property owner_ip: str
The IP address of the requestor node within the network.
- Return type
str
- property network_address: str
The network address of this network, without a netmask.
- Return type
str
- property netmask: str
The netmask of this network.
- Return type
str
- property gateway: Optional[str]
The gateway address within this network, if provided.
- Return type
Optional
[str
]
- property nodes_dict: Dict[str, str]
Mapping between the IP addresses and Node IDs of the nodes within this network.
- Return type
Dict
[str
,str
]
- property network_id: str
The automatically-generated, unique ID of this VPN.
- Return type
str
- async add_owner_address(ip)
Assign the given IP address to the requestor in the network.
- Parameters
ip (
str
) – the IP address to assign to the requestor node.
Node
- class yapapi.network.Node(network, node_id, ip)
Describes a node in a VPN, mapping a Golem node id to an IP address.
- network: yapapi.network.Network
The
Network
(the specific VPN) this node is part of.
- node_id: str
Golem id of the node.
- ip: str
IP address of this node in this particular VPN.
- get_deploy_args()
Generate a dictionary of arguments that are required for the appropriate Deploy command of an exescript in order to pass the network configuration to the runtime on the provider’s end.
- Return type
Dict
Exceptions
Payload definition
Payload
- class yapapi.payload.Payload(**kwargs)
Base class for descriptions of the payload required by the requestor.
example usage:
import asyncio from dataclasses import dataclass from yapapi.props.builder import DemandBuilder from yapapi.props.base import prop, constraint from yapapi.props import inf from yapapi.payload import Payload CUSTOM_RUNTIME_NAME = "my-runtime" CUSTOM_PROPERTY = "golem.srv.app.myprop" @dataclass class MyPayload(Payload): myprop: str = prop(CUSTOM_PROPERTY, default="myvalue") runtime: str = constraint(inf.INF_RUNTIME_NAME, default=CUSTOM_RUNTIME_NAME) min_mem_gib: float = constraint(inf.INF_MEM, operator=">=", default=16) min_storage_gib: float = constraint(inf.INF_STORAGE, operator=">=", default=1024) async def main(): builder = DemandBuilder() payload = MyPayload(myprop="othervalue", min_mem_gib=32) await builder.decorate(payload) print(builder) asyncio.run(main())
output:
{'properties': {'golem.srv.app.myprop': 'othervalue'}, 'constraints': ['(&(golem.runtime.name=my-runtime) (golem.inf.mem.gib>=32) (golem.inf.storage.gib>=1024))']}
Package
- class yapapi.payload.package.Package
Description of a task package (e.g. a VM image) deployed on the provider nodes
vm.repo
- async yapapi.payload.vm.repo(*, image_hash, image_url=None, min_mem_gib=0.5, min_storage_gib=2.0, min_cpu_threads=1, capabilities=None)
Build a reference to application package.
- Parameters
image_hash (
str
) – hash of the package’s imageimage_url (
Optional
[str
]) – URL of the package’s imagemin_mem_gib (
float
) – minimal memory required to execute application codemin_storage_gib (
float
) – minimal disk storage to execute tasksmin_cpu_threads (
int
) – minimal available logical CPU corescapabilities (
Optional
[List
[Literal
[‘vpn’]]]) – an optional list of required vm capabilities
- Return type
- Returns
the payload definition for the given VM image
example usage:
package = await vm.repo( # if we provide only the image hash, the image will be # automatically pulled from Golem's image repository image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376", )
example usage with an explicit GVMI image URL (useful to host images outside the Golem repository):
package = await vm.repo( # we still need to provide the image's hash because # the image's integrity is validated by the runtime on the provider node # # the hash can be calculated by running `sha3sum -a 224 <image_filename.gvmi>` # image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376", # the URL can point to any publicly-available location on the web image_url="http://girepo.dev.golem.network:8000/docker-golem-hello-world-latest-779758b432.gvmi", )
example usage with additional constraints:
package = await vm.repo( image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae", # only run on provider nodes that have more than 0.5gb of RAM available min_mem_gib=0.5, # only run on provider nodes that have more than 2gb of storage space available min_storage_gib=2.0, # only run on provider nodes which a certain number of CPU threads available min_cpu_threads=min_cpu_threads, )
Execution control
WorkContext
- class yapapi.WorkContext(activity, agreement, storage, emitter)
Provider node’s work context.
Used to schedule commands to be sent to the provider and enable other interactions between the requestor agent’s client code and the activity on provider’s end.
- property id: str
Unique identifier for this work context.
- Return type
str
- property provider_name: Optional[str]
Return the name of the provider associated with this work context.
- Return type
Optional
[str
]
- property provider_id: str
Return the id of the provider associated with this work context.
- Return type
str
- new_script(timeout=None, wait_for_results=True)
Create an instance of
Script
attached to thisWorkContext
instance.This is equivalent to calling Script(work_context). This method is intended to provide a direct link between the two object instances.
- Return type
- async get_raw_usage()
Get the raw usage vector for the activity bound to this work context.
The value comes directly from the low level API and is not interpreted in any way.
- Return type
ActivityUsage
- async get_usage()
Get the current usage for the activity bound to this work context.
- Return type
ActivityUsage
- async get_raw_state()
Get the state activity bound to this work context.
The value comes directly from the low level API and is not interpreted in any way.
- Return type
ActivityState
- async get_cost()
Get the accumulated cost of the activity based on the reported usage.
- Return type
Optional
[float
]
Script
- class yapapi.script.Script(context, timeout=None, wait_for_results=True)
Represents a series of commands to be executed on a provider node.
New commands are added to the script either through its
add()
method or by calling one of the convenience methods provided (for example:run()
orupload_json()
). Adding a new command does not result in it being immediately executed. Once ready, aScript
instance is meant to be yielded from a worker function (work generator pattern). Commands will be run in the order in which they were added to the script.- __init__(context, timeout=None, wait_for_results=True)
Initialize a
Script
- Parameters
context (
WorkContext
) – Ayapapi.WorkContext
that will be used to evaluate the script (i.e. to send commands to the provider)timeout (
Optional
[timedelta
]) – Time after which this script’s execution should be forcefully interrupted. The default value is None which means there’s no timeout set.wait_for_results (
bool
) – Whether this script’s execution should block until its results are available. The default value is True.
- property id: int
Return the ID of this
Script
instance.IDs are provided by a global iterator and therefore are guaranteed to be unique during the program’s execution.
- Return type
int
- add(cmd)
Add a
yapapi.script.command.Command
to theScript
- Return type
Awaitable
[CommandExecuted
]
- deploy(**kwargs)
Schedule a
Deploy
command on the provider.- Return type
Awaitable
[CommandExecuted
]
- start(*args)
Schedule a
Start
command on the provider.- Return type
Awaitable
[CommandExecuted
]
- terminate()
Schedule a
Terminate
command on the provider.- Return type
Awaitable
[CommandExecuted
]
- run(cmd, *args, env=None, stderr=None, stdout=None)
Schedule running a shell command on the provider.
- Parameters
cmd (
str
) – command to run on the provider, e.g. /my/dir/run.shargs (
str
) – command arguments, e.g. “input1.txt”, “output1.txt”env (
Optional
[Dict
[str
,str
]]) – optional dictionary with environment variablesstderr (
Optional
[CaptureContext
]) – capture context to use for stderrstdout (
Optional
[CaptureContext
]) – capture context to use for stdout
- Return type
Awaitable
[CommandExecuted
]
- download_bytes(src_path, on_download, limit=1048576)
Schedule downloading a remote file from the provider as bytes.
- Parameters
src_path (
str
) – remote (provider) source pathon_download (
Callable
[[bytes
],Awaitable
]) – the callable to run on the received datalimit (
int
) – limit of bytes to be downloaded (expected size)
- Return type
Awaitable
[CommandExecuted
]
- download_file(src_path, dst_path)
Schedule downloading a remote file from the provider.
- Parameters
src_path (
str
) – remote (provider) source pathdst_path (
str
) – local (requestor) destination path
- Return type
Awaitable
[CommandExecuted
]
- download_json(src_path, on_download, limit=1048576)
Schedule downloading a remote file from the provider as JSON.
- Parameters
src_path (
str
) – remote (provider) source pathon_download (
Callable
[[Any
],Awaitable
]) – the callable to run on the received datalimit (
int
) – limit of bytes to be downloaded (expected size)
- Return type
Awaitable
[CommandExecuted
]
- upload_bytes(data, dst_path)
Schedule sending bytes data to the provider.
- Parameters
data (
bytes
) – bytes to senddst_path (
str
) – remote (provider) destination path
- Return type
Awaitable
[CommandExecuted
]
- upload_file(src_path, dst_path)
Schedule sending a file to the provider.
- Parameters
src_path (
str
) – local (requestor) source pathdst_path (
str
) – remote (provider) destination path
- Return type
Awaitable
[CommandExecuted
]
- upload_json(data, dst_path)
Schedule sending JSON data to the provider.
- Parameters
data (
dict
) – dictionary representing JSON data to senddst_path (
str
) – remote (provider) destination path
- Return type
Awaitable
[CommandExecuted
]
Market strategies
- class yapapi.strategy.MarketStrategy
Abstract market strategy.
- async decorate_demand(demand)
Optionally add relevant constraints to a Demand.
- Return type
None
- abstract async score_offer(offer)
Score offer. Better offers should get higher scores.
- Return type
float
- class yapapi.strategy.DummyMS(max_fixed_price=Decimal('0.05'), max_price_for=mappingproxy({}), activity=None)
A default market strategy implementation.
Its
score_offer()
method returnsSCORE_NEUTRAL
for every offer with prices that do not exceed maximum prices specified for each counter. For other offers, returnsSCORE_REJECTED
.
- class yapapi.strategy.LeastExpensiveLinearPayuMS(expected_time_secs=60, max_fixed_price=Decimal('Infinity'), max_price_for=mappingproxy({}))
A strategy that scores offers according to cost for given computation time.
- class yapapi.strategy.DecreaseScoreForUnconfirmedAgreement(base_strategy, factor)
A market strategy that modifies a base strategy based on history of agreements.
Events
Objects representing events in a Golem computation.
Everytime something important happens, an event is emitted.
Emitted events are passed to all current event consumers, set
either in yapapi.Golem.__init__()
or via yapapi.Golem.add_event_consumer()
.
Events are considered a semi-experimental feature:
The backward compatibility of subsequent future updates is not guaranteed, the interface might change between major releases without prior deprecation.
Some parts are not documented and should not be considered a public interface.
Every event is described by a set of attributes that can be divided into three groups:
Attributes common to all events, documented on
yapapi.events.Event
.Internal parts of yapapi that define the context of the event (details in the next section).
Additional event-specific information, e.g. the reason of the agreement termination for the
AgreementTerminated
event, described along the particular event classes.
Events should be consumed in a strict read_only mode: event objects are shared between all event consumers, and their attributes are used internally by the Golem engine, so any modification may have unexpected side effects.
Events inheritance tree
Only leaf events are ever emitted, other events (named
*Event
) are abstract classesEvery abstract class has one more yapapi object attached then the parent, e.g.
JobEvent
is anEvent
that happened in the context of a particularjob
AgreementEvent
is aJobEvent
that happened in the context of a particularagreement
Event
JobEvent
SubscriptionFailed
SubscriptionEvent
SubscriptionCreated
CollectFailed
ProposalEvent
ProposalReceived
ProposalRejected
ProposalResponded
ProposalConfirmed
ProposalFailed
NoProposalsConfirmed
JobStarted
JobFinished
AgreementEvent
AgreementCreated
AgreementConfirmed
AgreementRejected
AgreementTerminated
ActivityCreateFailed
WorkerStarted
ActivityEvent
ActivityCreated
TaskEvent
TaskStarted
TaskFinished
TaskAccepted
TaskRejected
ServiceEvent
ServiceStarted
ServiceFinished
ScriptEvent
ScriptSent
CommandEvent
CommandStarted
CommandStdOut
CommandStdErr
CommandExecuted
DownloadStarted
DownloadFinished
GettingResults
ScriptFinished
WorkerFinished
InvoiceEvent
InvoiceReceived
InvoiceAccepted
DebitNoteEvent
DebitNoteReceived
DebitNoteAccepted
PaymentFailed
ExecutionInterrupted
ShutdownFinished
Custom events
Apart from consuming events emitted by the Python API’s internal components, application authors may wish to define their own, custom event classes. That way, Golem’s event system can be easily extended with additional, custom triggers while keeping the consumer logic uniform for both Golem’s own events and the custom ones.
Additionally, such custom events may be used to create pieces of useful, reusable components while keeping the the emitting and consuming logic separate at the same time. As an example, one could provide a custom MarketStrategy that consumes an event such as e.g.: ActivityEvaluated and have many different applications utilize this strategy but evaluate activities in different ways.
Example usage
Declare the class:
from yapapi.events import ActivityEvent
import attr
@attr.s(auto_attribs=True, repr=False)
class ActivityEvaluated(ActivityEvent):
activity_score: float
Emit the event e.g. in the worker()
function of the Task API:
async def worker(ctx: WorkContext, tasks):
... # whatever
activity_score = await score_activity(ctx) # Some app-specific logic
ctx.emit(ActivityEvaluated, activity_score=activity_score)
And consume the event anywhere you want (e.g. in a MarketStrategy method):
def event_consumer(event):
if isinstance(event, ActivityEvaluated):
if event.activity_score < 7:
print(f"Oh no! Activity {event.activity.id} is scored below 7!")
golem.add_event_consumer(event_consumer)
List of event classes
- class yapapi.events.Event(*, exc_info=None)
An abstract base class for all types of events.
- exc_info: Optional[Tuple[Type[BaseException], BaseException, Optional[types.TracebackType]]]
Tuple containing exception info as returned by sys.exc_info(), if applicable.
- timestamp: datetime.datetime
Event creation time
- property exception: Optional[BaseException]
Exception associated with this event or None
- Return type
Optional
[BaseException
]
- class yapapi.events.JobEvent(job, *, exc_info=None)
- class yapapi.events.SubscriptionEvent(job, subscription, *, exc_info=None)
- class yapapi.events.ProposalEvent(job, proposal, *, exc_info=None)
- class yapapi.events.AgreementEvent(job, agreement, *, exc_info=None)
- class yapapi.events.ActivityEvent(job, agreement, activity, *, exc_info=None)
- class yapapi.events.TaskEvent(job, agreement, activity, task, *, exc_info=None)
- class yapapi.events.ServiceEvent(job, agreement, activity, service, *, exc_info=None)
- class yapapi.events.ScriptEvent(job, agreement, activity, script, *, exc_info=None)
- class yapapi.events.CommandEvent(job, agreement, activity, script, command, *, exc_info=None)
- class yapapi.events.InvoiceEvent(job, agreement, invoice, *, exc_info=None)
- class yapapi.events.DebitNoteEvent(job, agreement, debit_note, *, exc_info=None)
- class yapapi.events.JobStarted(job, *, exc_info=None)
- class yapapi.events.JobFinished(job, *, exc_info=None)
job
is done, succeded ifexception
is None failed otherwise.
- class yapapi.events.SubscriptionCreated(job, subscription, *, exc_info=None)
- class yapapi.events.SubscriptionFailed(job, reason, *, exc_info=None)
- class yapapi.events.CollectFailed(job, subscription, reason, *, exc_info=None)
- class yapapi.events.ProposalReceived(job, proposal, *, exc_info=None)
- class yapapi.events.ProposalRejected(job, proposal, reason=None, *, exc_info=None)
We decided to reject provider’s proposal because of a
reason
- class yapapi.events.ProposalResponded(job, proposal, *, exc_info=None)
- class yapapi.events.ProposalConfirmed(job, proposal, *, exc_info=None)
- class yapapi.events.ProposalFailed(job, proposal, *, exc_info=None)
- class yapapi.events.NoProposalsConfirmed(job, timeout, *, exc_info=None)
We didn’t confirm any proposal for a period of
timeout
- class yapapi.events.AgreementCreated(job, agreement, *, exc_info=None)
- class yapapi.events.AgreementConfirmed(job, agreement, *, exc_info=None)
- class yapapi.events.AgreementRejected(job, agreement, *, exc_info=None)
- class yapapi.events.AgreementTerminated(job, agreement, reason, *, exc_info=None)
- class yapapi.events.DebitNoteReceived(job, agreement, debit_note, *, exc_info=None)
- class yapapi.events.DebitNoteAccepted(job, agreement, debit_note, *, exc_info=None)
- class yapapi.events.PaymentFailed(job, agreement, *, exc_info=None)
- class yapapi.events.InvoiceReceived(job, agreement, invoice, *, exc_info=None)
- class yapapi.events.InvoiceAccepted(job, agreement, invoice, *, exc_info=None)
- class yapapi.events.WorkerStarted(job, agreement, *, exc_info=None)
- class yapapi.events.ActivityCreated(job, agreement, activity, *, exc_info=None)
- class yapapi.events.ActivityCreateFailed(job, agreement, *, exc_info=None)
- class yapapi.events.TaskStarted(job, agreement, activity, task, *, exc_info=None)
- class yapapi.events.TaskFinished(job, agreement, activity, task, *, exc_info=None)
- class yapapi.events.ServiceStarted(job, agreement, activity, service, *, exc_info=None)
- class yapapi.events.ServiceFinished(job, agreement, activity, service, *, exc_info=None)
- class yapapi.events.WorkerFinished(job, agreement, activity, *, exc_info=None)
- class yapapi.events.ScriptSent(job, agreement, activity, script, *, exc_info=None)
- class yapapi.events.GettingResults(job, agreement, activity, script, *, exc_info=None)
- class yapapi.events.ScriptFinished(job, agreement, activity, script, *, exc_info=None)
- class yapapi.events.CommandExecuted(job, agreement, activity, script, command, success, message, stdout=None, stderr=None, *, exc_info=None)
- class yapapi.events.CommandStarted(job, agreement, activity, script, command, *, exc_info=None)
- class yapapi.events.CommandStdOut(job, agreement, activity, script, command, output, *, exc_info=None)
- class yapapi.events.CommandStdErr(job, agreement, activity, script, command, output, *, exc_info=None)
- class yapapi.events.TaskAccepted(job, agreement, activity, task, *, exc_info=None)
- class yapapi.events.TaskRejected(job, agreement, activity, task, reason, *, exc_info=None)
- class yapapi.events.DownloadStarted(job, agreement, activity, script, command, *, exc_info=None)
- class yapapi.events.DownloadFinished(job, agreement, activity, script, command, *, exc_info=None)
- class yapapi.events.ShutdownFinished(*, exc_info=None)
Golem completed the shutdown sequence and is no longer operative
- class yapapi.events.ExecutionInterrupted(*, exc_info=None)
Golem was stopped by an unhandled exception in code not managed by yapapi
Exceptions
- exception yapapi.NoPaymentAccountError(required_driver, required_network)
The error raised if no payment account for the required driver/network is available.
- exception yapapi.rest.activity.BatchTimeoutError
An exception that indicates that an execution of a batch of commands timed out.
Logging
- yapapi.log.enable_default_logger(format_='[%(asctime)s %(levelname)s %(name)s] %(message)s', log_file=None, debug_activity_api=False, debug_market_api=False, debug_payment_api=False, debug_net_api=False)
Enable the default logger that logs messages to stderr with level INFO.
If log_file is specified, the logger with output messages with level DEBUG to the given file.
- class yapapi.log.SummaryLogger(wrapped_emitter=None)
Aggregates information from computation events to provide a high-level summary.
The logger’s
log()
method can be used as event_consumer callback inGolem
initialization. It will aggregate the events generated and output some summary information.The optional wrapped_emitter argument can be used for chaining event emitters: each event logged with
log()
is first passed to wrapped_emitter.For example, with the following setup, each event will be logged by log_event_repr, and additionally, certain events will cause summary messages to be logged.
detailed_logger = log_event_repr summary_logger = SummaryLogger(wrapped_emitter=detailed_logger).log golem = Golem(..., event_consumer=summary_logger)
- log(event)
Register an event.
- Return type
None
- yapapi.log.log_summary(wrapped_emitter=None)
Output a summary of computation.
This is a utility function that creates a
SummaryLogger
instance wrapping an optional wrapped_emitter and returns itslog()
method.See the documentation of
SummaryLogger
for more information.
Utils
- yapapi.windows_event_loop_fix()
Set up asyncio to use ProactorEventLoop implementation for new event loops on Windows.
This work-around is only needed for Python 3.6 and 3.7. With Python 3.8, ProactorEventLoop is already the default on Windows.
- yapapi.get_version()
- Return type
str
- Returns
the version of the yapapi library package
Yapapi Contrib
Useful, reusable pieces of code built on top of yapapi.
This part of yapapi repository serves to provide requestor agent application authors with reusable components that we or third-party developers found useful while working on examples and apps that use our Python API.
They’re not strictly part of the high level API library itself and while we do intend to keep them in sync with the subsequent releases, they should be treated as experimental and their interface may change without a proper deprecation process.
Provider Filter
Market strategy wrapper that enables easy exclusion of offers from certain providers using a simple boolean condition, while preserving correct scoring of the remaining offers by the base strategy.
- class yapapi.contrib.strategy.ProviderFilter(base_strategy, is_allowed)
ProviderFilter - extend a market strategy with a layer that excludes offers from certain issuers
- Parameters
base_strategy (
BaseMarketStrategy
) – a market strategy that will be used to score offers from allowed providersis_allowed (
Union
[Callable
[[str
],bool
],Callable
[[str
],Awaitable
[bool
]]]) – a callable that accepts provider_id as an argument and returns either a boolean, or a boolean-returning awaitable, determining if offers from this provider should be considered (that is: scored by the base_strategy)
Example 1. Block selected providers:
bad_providers = ['bad_provider_1', 'bad_provider_2'] base_strategy = SomeStrategy(...) strategy = ProviderFilter(base_strategy, lambda provider_id: provider_id not in bad_providers)
Example 2. Select providers using a database table:
# create an async database connection # (sync would also work, but could hurt `yapapi` overall performance) async_conn = ... async def is_allowed(provider_id): result = await async_conn.execute("SELECT 1 FROM allowed_providers WHERE provider_id = ?", provider_id) return bool(result.fetchall()) base_strategy = SomeStrategy() strategy = ProviderFilter(base_strategy, is_allowed)
Example 3. Use the default strategy, but disable every provider that fails to create an activity:
from yapapi import events bad_providers = set() def denying_event_consumer(event: events.Event): if isinstance(event, events.ActivityCreateFailed): bad_providers.add(event.provider_id) golem = Golem(...) golem.strategy = ProviderFilter(golem.strategy, lambda provider_id: provider_id not in bad_providers) await golem.add_event_consumer(denying_event_consumer) async with golem: ... # NOTE: this will currently work only for **new** offers from the provider, because old offers are already # scored, this should improve in https://github.com/golemfactory/yapapi/issues/820
Local HTTP Proxy
A local HTTP proxy that enables easy connection to any VPN-enabled, HTTP-based services launched on Golem providers using yapapi’s Services API.
For usage in a complete requestor agent app, see the http-proxy and webapp examples in the yapapi repository.
- class yapapi.contrib.service.http_proxy.LocalHttpProxy(cluster, port)
runs a local aiohttp server and processes requests through instances of HttpProxyService.
- class yapapi.contrib.service.http_proxy.HttpProxyService(remote_port=80, remote_host=None, response_timeout=10.0)