Golem Python API Reference
Golem
- class yapapi.Golem(*, budget, strategy=None, subnet_tag=None, driver=None, payment_driver=None, network=None, payment_network=None, event_consumer=None, stream_output=False, app_key=None)
The main entrypoint of Golem's high-level API.
Its principal role is providing an interface to run the requestor’s payload using one of two modes of operation - executing tasks and running services.
The first one, available through
execute_tasks()
, instructsGolem
to take a sequence of tasks that the user wishes to compute on Golem and distributes those among the providers.The second one, invoked with
run_service()
, makesGolem
spawn a certain number of instances of a service based on a single service specification (a specialized implementation inheriting fromService
).While the two modes are not necessarily completely disjoint - in that we can create a service that exists to process a certain number of computations and, similarly, we can use the task model to run some service - the main difference lies in the lifetime of such a job.
Whereas a task-based job exists for the purpose of computing the specific sequence of tasks and is done once the whole sequence has been processed, the service-based job is created for a potentially indefinite period and the services spawned within it are kept alive for as long as they’re needed.
Additionally, the service interface provides a way to easily define handlers for certain, discrete phases of a lifetime of each service instance - startup, running and shutdown.
Internally,
Golem
’s job includes running the engine which takes care of first finding the providers interested in the jobs the requestors want to execute, then negotiating agreements with them and facilitating the execution of those jobs and lastly, processing payments. For this reason, it’s usually good to have just one instance ofGolem
operative at any given time.- __init__(*, budget, strategy=None, subnet_tag=None, driver=None, payment_driver=None, network=None, payment_network=None, event_consumer=None, stream_output=False, app_key=None)
Initialize Golem engine.
- Parameters
budget (
Union
[float
,Decimal
]) – maximum budget for paymentsstrategy (
Optional
[MarketStrategy
]) – market strategy used to select providers from the market (e.g.yapapi.strategy.LeastExpensiveLinearPayuMS
oryapapi.strategy.DummyMS
)subnet_tag (
Optional
[str
]) – use only providers in the subnet with the subnet_tag name. Uses YAGNA_SUBNET environment variable, defaults to Nonedriver (
Optional
[str
]) – deprecated, please use payment_driver insteadpayment_driver (
Optional
[str
]) – name of the payment driver to use. Uses YAGNA_PAYMENT_DRIVER environment variable, defaults to zksync. Only payment platforms with the specified driver will be usednetwork (
Optional
[str
]) – deprecated, please use payment_network insteadpayment_network (
Optional
[str
]) – name of the network to use. Uses YAGNA_NETWORK environment variable, defaults to rinkeby. Only payment platforms with the specified network will be usedevent_consumer (
Optional
[Callable
[[Event
],None
]]) – a callable that processes events related to the computation; by default it is a function that logs all eventsstream_output (
bool
) – stream computation output from providersapp_key (
Optional
[str
]) – optional Yagna application key. If not provided, the default is to get the value from YAGNA_APPKEY environment variable
- async create_network(ip, owner_ip=None, mask=None, gateway=None)
Create a VPN inside Golem network.
Requires yagna >= 0.8
- Parameters
ip (
str
) – the IP address of the network. May contain netmask, e.g. “192.168.0.0/24”owner_ip (
Optional
[str
]) – the desired IP address of the requestor node within the newly-created Networkmask (
Optional
[str
]) – Optional netmask (only if not provided within the ip argument)gateway (
Optional
[str
]) – Optional gateway address for the network
- Return type
- execute_tasks(worker, data, payload, max_workers=None, timeout=None, job_id=None, implicit_init=True)
Submit a sequence of tasks to be executed on providers.
Internally, this method creates an instance of
yapapi.executor.Executor
and calls itssubmit()
method with given worker function and sequence of tasks.- Parameters
worker (
Callable
[[WorkContext
,AsyncIterator
[Task
[~D, ~R]]],AsyncGenerator
[Script
,Awaitable
[List
[CommandEvent
]]]]) – an async generator that takes aWorkContext
object and a sequence of tasks, and generates as sequence of scripts to be executed on providers in order to compute given tasksdata (
Union
[AsyncIterator
[Task
[~D, ~R]],Iterable
[Task
[~D, ~R]]]) – an iterable or an async generator ofTask
objects to be computed on providerspayload (
Payload
) – specification of the payload that needs to be deployed on providers (for example, a VM runtime package) in order to compute the tasks, passed to the createdExecutor
instancemax_workers (
Optional
[int
]) – maximum number of concurrent workers, passed to theExecutor
instancetimeout (
Optional
[timedelta
]) – timeout for computing all tasks, passed to theExecutor
instancejob_id (
Optional
[str
]) – an optional string to identify the job created by this method. Passed as the value of the id parameter toyapapi.engine.Job
.implicit_init (
bool
) – True ->deploy()
andstart()
will be called internally by theExecutor
. False -> those calls must be in the worker function
- Return type
AsyncIterator
[Task
[~D, ~R]]- Returns
an async iterator that yields completed Task objects
example usage:
async def worker(context: WorkContext, tasks: AsyncIterable[Task]): async for task in tasks: context.run("/bin/sh", "-c", "date") future_results = yield context.commit() results = await future_results task.accept_result(result=results[-1]) package = await vm.repo( image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376", ) async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem: async for completed in golem.execute_tasks(worker, [Task(data=None)], payload=package): print(completed.result.stdout)
- async run_service(service_class, num_instances=None, instance_params=None, payload=None, expiration=None, respawn_unstarted_instances=True, network=None, network_addresses=None)
Run a number of instances of a service represented by a given
Service
subclass.- Parameters
service_class (
Type
[Service
]) – a subclass ofService
that represents the service to be runnum_instances (
Optional
[int
]) – optional number of service instances to run. Defaults to a single instance, unless instance_params is given, in which case, theCluster
will be created with as many instances as there are elements in the instance_params iterable. if num_instances is set to < 1, theCluster
will still be created but no instances will be spawned within it.instance_params (
Optional
[Iterable
[Dict
]]) – optional list of dictionaries of keyword arguments that will be passed to consecutive, spawned instances. The number of elements in the iterable determines the number of instances spawned, unless num_instances is given, in which case the latter takes precedence. In other words, if both num_instances and instance_params are provided, theCluster
will be created with the number of instances determined by num_instances and if there are too few elements in the instance_params iterable, it will results in an error.payload (
Optional
[Payload
]) – optional runtime definition for the service; if not provided, the payload specified by theget_payload()
method of service_class is usedexpiration (
Optional
[datetime
]) – optional expiration datetime for the servicerespawn_unstarted_instances – if an instance fails in the starting state, should the returned
Cluster
try to spawn another instancenetwork (
Optional
[Network
]) – optionalNetwork
, representing a VPN to attach thisCluster
’s instances tonetwork_addresses (
Optional
[List
[str
]]) – optional list of addresses to assign to consecutive spawned instances. If there are too few addresses given in the network_addresses iterable to satisfy all spawned instances, the rest (or all when the list is empty or not provided at all) of the addresses will be assigned automatically. Requires the network argument to be provided at the same time.
example usage:
DATE_OUTPUT_PATH = "/golem/work/date.txt" REFRESH_INTERVAL_SEC = 5 class DateService(Service): @staticmethod async def get_payload(): return await vm.repo( image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376", ) async def start(self): async for script in super().start(): yield script # every `DATE_POLL_INTERVAL` write output of `date` to `DATE_OUTPUT_PATH` script = self._ctx.new_script() script.run( "/bin/sh", "-c", f"while true; do date > {DATE_OUTPUT_PATH}; sleep {REFRESH_INTERVAL_SEC}; done &", ) yield script async def run(self): while True: await asyncio.sleep(REFRESH_INTERVAL_SEC) script = self._ctx.new_script() future_result = script.run( "/bin/sh", "-c", f"cat {DATE_OUTPUT_PATH}", ) yield script result = (await future_result).stdout print(result.strip() if result else "") async def main(): async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem: cluster = await golem.run_service(DateService, num_instances=1) start_time = datetime.now() while datetime.now() < start_time + timedelta(minutes=1): for num, instance in enumerate(cluster.instances): print(f"Instance {num} is {instance.state.value} on {instance.provider_name}") await asyncio.sleep(REFRESH_INTERVAL_SEC)
- Return type
Task API
Task
- class yapapi.Task(data)
One computation unit.
Represents one computation unit that will be run on the provider (e.g. rendering of one frame of an animation).
- __init__(data)
Create a new
Task
object.- Parameters
data (~TaskData) – contains information needed to prepare command list for the provider
- accept_result(result=None)
Accept the result of this task.
Must be called when the result is correct to mark this task as completed.
- Parameters
result (
Optional
[~TaskResult]) – task computation result (optional)- Return type
None
- reject_result(reason=None, retry=False)
Reject the result of this task.
Must be called when the result is not correct to indicate that the task should be retried.
- Parameters
reason (
Optional
[str
]) – task rejection description (optional)- Return type
None
- property running_time: Optional[datetime.timedelta]
Return the running time of the task (if in progress) or time it took to complete it.
- Return type
Optional
[timedelta
]
Service API
Service
- class yapapi.services.Service(cluster, ctx, network_node=None)
Base class for service specifications.
To be extended by application developers to define their own, specialized Service specifications.
- async static get_payload()
Return the payload (runtime) definition for this service.
To be overridden by the author of a specific
Service
class.If
get_payload()
is not implemented, the payload will need to be provided in therun_service()
call.- Return type
Optional
[Payload
]
- property id: str
Return the id of this service instance.
Guaranteed to be unique within a
Cluster
.- Return type
str
- property is_available
Return True iff this instance is available (that is, starting, running or stopping).
- property network: Optional[yapapi.network.Network]
Return the
Network
to which this instance belongs (if any)- Return type
Optional
[Network
]
- property network_node: Optional[yapapi.network.Node]
Return the network
Node
record associated with this instance.- Return type
Optional
[Node
]
- property provider_name: Optional[str]
Return the name of the provider that runs this service instance.
- Return type
Optional
[str
]
- async receive_message()
Wait for a control message sent to this instance.
- Return type
ServiceSignal
- receive_message_nowait()
Retrieve a control message sent to this instance.
Return None if no message is available.
- Return type
Optional
[ServiceSignal
]
- run()
Implement the handler for the running state of the service.
To be overridden by the author of a specific
Service
class.Should contain any operations needed to ensure continuous operation of a service.
As a handler implementing the work generator pattern, it’s expected to be a generator that yields
Script
(generated using the service’s instance of theWorkContext
-self._ctx
) that are then dispatched to the activity by the engine.Results of those batches can then be retrieved by awaiting the values captured from yield statements.
A clean exit from a handler function triggers the engine to transition the state of the instance to the next stage in service’s lifecycle - in this case, to stopping.
Any unhandled exception will cause the instance to be terminated.
Example:
async def run(self): while True: script = self._ctx.new_script() stats_results = script.run(self.SIMPLE_SERVICE, "--stats") yield script stats = (await stats_results).stdout.strip() print(f"stats: {stats}")
Default implementation
Because the nature of the operations required during the “running” state depends directly on the specifics of a given
Service
and because it’s entirely plausible for a service not to require any direct interaction with the exe-unit (runtime) from the requestor’s end after the service has been started, the default is to just wait indefinitely without producing any batches.- Return type
AsyncGenerator
[Script
,Awaitable
[List
[CommandEvent
]]]
- async send_message(message=None)
Send a control message to this instance.
- send_message_nowait(message=None)
Send a control message to this instance without blocking.
May raise
asyncio.QueueFull
if the channel for sending control messages is full.
- shutdown()
Implement the handler for the stopping state of the service.
To be overridden by the author of a specific
Service
class.Should contain any operations that the requestor needs to ensure the instance is correctly and gracefully shut-down - e.g. that its final state is retrieved.
As a handler implementing the work generator pattern, it’s expected to be a generator that yields
Script
(generated using the service’s instance of theWorkContext
-self._ctx
) that are then dispatched to the activity by the engine.Results of those batches can then be retrieved by awaiting the values captured from yield statements.
Finishing the execution of this handler will trigger termination of this instance.
This handler will only be called if the activity running the service is still available. If the activity has already been deemed terminated or if the connection with the provider has been lost, the service will transition to the terminated state and the shutdown handler won’t be run.
Example:
async def shutdown(self): self._ctx.run("/golem/run/dump_state") self._ctx.download_file("/golem/output/state", "/some/local/path/state") self._ctx.terminate() yield self._ctx.commit()
Default implementation
By default, the activity is just sent a terminate command. Whether it’s absolutely required or not, again, depends on the implementation of the given runtime.
- Return type
AsyncGenerator
[Script
,Awaitable
[List
[CommandEvent
]]]
- start()
Implement the handler for the starting state of the service.
To be overridden by the author of a specific
Service
class.Should perform the minimum set of operations after which the instance of a service can be treated as “started”, or, in other words, ready to receive service requests. It’s up to the developer of the specific
Service
class to decide what exact operations constitute a service startup. In the most common scenariodeploy()
andstart()
are required, check the Default implementation section for more details.As a handler implementing the work generator pattern, it’s expected to be a generator that yields
Script
(generated using the service’s instance of theWorkContext
-self._ctx
) that are then dispatched to the activity by the engine.Results of those batches can then be retrieved by awaiting the values captured from yield statements.
A clean exit from a handler function triggers the engine to transition the state of the instance to the next stage in service’s lifecycle - in this case, to running.
On the other hand, any unhandled exception will cause the instance to be either retried on another provider node, if the
Cluster
’srespawn_unstarted_instances
argument is set to True inrun_service()
, which is also the default behavior, or altogether terminated, ifrespawn_unstarted_instances
is set to False.Example:
async def start(self): s = self._ctx.new_script() # deploy the exe-unit s.deploy(**self.get_deploy_args()) # start the exe-unit's container s.start() # start some service process within the container s.run("/golem/run/service_ctl", "--start") # send the batch to the provider yield s
### Default implementation
The default implementation assumes that, in order to accept commands, the runtime needs to be first deployed using the
deploy()
command, which is analogous to creation of a container corresponding with the desired payload, and then started using thestart()
command, actually launching the process that runs the aforementioned container.Additionally, it also assumes that the exe-unit doesn’t need any additional parameters in its
start()
call (e.g. for the VM runtime, all the required parameters are already passed as part of the agreement between the requestor and the provider), and parameters passed todeploy()
are returned byService.get_deploy_args()
method.Therefore, this default implementation performs the minimum required for a VM payload to start responding to run commands. If your service requires any additional operations - you’ll need to override this method (possibly first yielding from the parent - super().start() - generator) to add appropriate preparatory steps.
In case of runtimes other than VM, deploy and/or start might be optional or altogether disallowed, or they may take some parameters. It is up to the author of the specific Service implementation that uses such a payload to adjust this method accordingly based on the requirements for the given runtime/exe-unit type.
- Return type
AsyncGenerator
[Script
,Awaitable
[List
[CommandEvent
]]]
- property state
Return the current state of this instance.
Cluster
- class yapapi.services.Cluster(engine, service_class, payload, expiration=None, respawn_unstarted_instances=True, network=None)
Golem’s sub-engine used to spawn and control instances of a single
Service
.- property expiration: datetime.datetime
Return the expiration datetime for agreements related to services in this
Cluster
.- Return type
datetime
- property instances: List[yapapi.services.Service]
Return the list of service instances in this
Cluster
.- Return type
List
[Service
]
- property network: Optional[yapapi.network.Network]
Return the
Network
record associated with the VPN used by thisCluster
.- Return type
Optional
[Network
]
- property payload: yapapi.payload.Payload
Return the service runtime definition for this
Cluster
.- Return type
- property service_class: Type[yapapi.services.Service]
Return the class instantiated by all service instances in this
Cluster
.- Return type
Type
[Service
]
- async spawn_instance(params, network_address=None)
Spawn a new service instance within this
Cluster
.- Return type
None
- spawn_instances(num_instances=None, instance_params=None, network_addresses=None)
Spawn new instances within this
Cluster
.- Parameters
num_instances (
Optional
[int
]) – optional number of service instances to run. Defaults to a single instance, unless instance_params is given, in which case, theCluster
will spawn as many instances as there are elements in the instance_params iterable. if num_instances is not None and < 1, the method will immediately return and log a warning.instance_params (
Optional
[Iterable
[Dict
]]) – optional list of dictionaries of keyword arguments that will be passed to consecutive, spawned instances. The number of elements in the iterable determines the number of instances spawned, unless num_instances is given, in which case the latter takes precedence. In other words, if both num_instances and instance_params are provided, the number of instances spawned will be equal to num_instances and if there are too few elements in the instance_params iterable, it will results in an error.network_addresses (
Optional
[List
[str
]]) – optional list of network addresses in case theCluster
is attached to VPN. If the list is not provided (or if the number of elements is less than the number of spawned instances), any instances for which the addresses have not been given, will be assigned an address automatically.
- Return type
None
ServiceState
Network API
Network
- class yapapi.network.Network(net_api, ip, owner_id, owner_ip=None, mask=None, gateway=None)
Describes a VPN created between the requestor and the provider nodes within Golem Network.
- async add_node(node_id, ip=None)
Add a new node to the network.
- Parameters
node_id (
str
) – Node ID within the Golem network of this VPN node.ip (
Optional
[str
]) – IP address to assign to this node.
- Return type
- async add_owner_address(ip)
Assign the given IP address to the requestor in the network.
- Parameters
ip (
str
) – the IP address to assign to the requestor node.
- async classmethod create(net_api, ip, owner_id, owner_ip=None, mask=None, gateway=None)
Create a new VPN.
- Parameters
net_api (
Net
) – the mid-level binding used directly to perform calls to the REST API.ip (
str
) – the IP address of the network. May contain netmask, e.g. “192.168.0.0/24”owner_id (
str
) – the node ID of the owner of this VPN (the requestor)owner_ip (
Optional
[str
]) – the desired IP address of the requestor node within the newly-created networkmask (
Optional
[str
]) – Optional netmask (only if not provided within the ip argument)gateway (
Optional
[str
]) – Optional gateway address for the network
- Return type
- property gateway: Optional[str]
The gateway address within this network, if provided.
- Return type
Optional
[str
]
- property netmask: str
The netmask of this network.
- Return type
str
- property network_address: str
The network address of this network, without a netmask.
- Return type
str
- property network_id: str
The automatically-generated, unique ID of this VPN.
- Return type
str
- property nodes_dict: Dict[str, str]
Mapping between the IP addresses and Node IDs of the nodes within this network.
- Return type
Dict
[str
,str
]
- property owner_ip: str
The IP address of the requestor node within the network.
- Return type
str
Node
- class yapapi.network.Node(network, node_id, ip)
Describes a node in a VPN, mapping a Golem node id to an IP address.
- get_deploy_args()
Generate a dictionary of arguments that are required for the appropriate Deploy command of an exescript in order to pass the network configuration to the runtime on the provider’s end.
- Return type
Dict
- ip: str
IP address of this node in this particular VPN.
- network: yapapi.network.Network
The
Network
(the specific VPN) this node is part of.
- node_id: str
Golem id of the node.
Exceptions
Payload definition
Payload
- class yapapi.payload.Payload(**kwargs)
Base class for descriptions of the payload required by the requestor.
example usage:
import asyncio from dataclasses import dataclass from yapapi.props.builder import DemandBuilder from yapapi.props.base import prop, constraint from yapapi.props import inf from yapapi.payload import Payload CUSTOM_RUNTIME_NAME = "my-runtime" CUSTOM_PROPERTY = "golem.srv.app.myprop" @dataclass class MyPayload(Payload): myprop: str = prop(CUSTOM_PROPERTY, default="myvalue") runtime: str = constraint(inf.INF_RUNTIME_NAME, default=CUSTOM_RUNTIME_NAME) min_mem_gib: float = constraint(inf.INF_MEM, operator=">=", default=16) min_storage_gib: float = constraint(inf.INF_STORAGE, operator=">=", default=1024) async def main(): builder = DemandBuilder() payload = MyPayload(myprop="othervalue", min_mem_gib=32) await builder.decorate(payload) print(builder) asyncio.run(main())
output:
{'properties': {'golem.srv.app.myprop': 'othervalue'}, 'constraints': ['(&(golem.runtime.name=my-runtime) (golem.inf.mem.gib>=32) (golem.inf.storage.gib>=1024))']}
Package
- class yapapi.payload.package.Package
Description of a task package (e.g. a VM image) deployed on the provider nodes
vm.repo
- async yapapi.payload.vm.repo(*, image_hash, image_url=None, min_mem_gib=0.5, min_storage_gib=2.0, min_cpu_threads=1, capabilities=None)
Build a reference to application package.
- Parameters
image_hash (
str
) – hash of the package’s imageimage_url (
Optional
[str
]) – URL of the package’s imagemin_mem_gib (
float
) – minimal memory required to execute application codemin_storage_gib (
float
) – minimal disk storage to execute tasksmin_cpu_threads (
int
) – minimal available logical CPU corescapabilities (
Optional
[List
[Literal
[‘vpn’]]]) – an optional list of required vm capabilities
- Return type
- Returns
the payload definition for the given VM image
Execution control
WorkContext
- class yapapi.WorkContext(activity, agreement_details, storage, emitter=None)
Provider node’s work context.
Used to schedule commands to be sent to the provider and enable other interactions between the requestor agent’s client code and the activity on provider’s end.
- async get_cost()
Get the accumulated cost of the activity based on the reported usage.
- Return type
Optional
[float
]
- async get_raw_state()
Get the state activity bound to this work context.
The value comes directly from the low level API and is not interpreted in any way.
- Return type
ActivityState
- async get_raw_usage()
Get the raw usage vector for the activity bound to this work context.
The value comes directly from the low level API and is not interpreted in any way.
- Return type
ActivityUsage
- async get_usage()
Get the current usage for the activity bound to this work context.
- Return type
ActivityUsage
- property id: str
Unique identifier for this work context.
- Return type
str
- new_script(timeout=None, wait_for_results=True)
Create an instance of
Script
attached to thisWorkContext
instance.This is equivalent to calling Script(work_context). This method is intended to provide a direct link between the two object instances.
- Return type
- property provider_id: str
Return the id of the provider associated with this work context.
- Return type
str
- property provider_name: Optional[str]
Return the name of the provider associated with this work context.
- Return type
Optional
[str
]
Script
- class yapapi.script.Script(context, timeout=None, wait_for_results=True)
Represents a series of commands to be executed on a provider node.
New commands are added to the script either through its
add()
method or by calling one of the convenience methods provided (for example:run()
orupload_json()
). Adding a new command does not result in it being immediately executed. Once ready, aScript
instance is meant to be yielded from a worker function (work generator pattern). Commands will be run in the order in which they were added to the script.- __init__(context, timeout=None, wait_for_results=True)
Initialize a
Script
- Parameters
context (
WorkContext
) – Ayapapi.WorkContext
that will be used to evaluate the script (i.e. to send commands to the provider)timeout (
Optional
[timedelta
]) – Time after which this script’s execution should be forcefully interrupted. The default value is None which means there’s no timeout set.wait_for_results (
bool
) – Whether this script’s execution should block until its results are available. The default value is True.
- deploy(**kwargs)
Schedule a
Deploy
command on the provider.- Return type
Awaitable
[CommandExecuted
]
- download_bytes(src_path, on_download, limit=1048576)
Schedule downloading a remote file from the provider as bytes.
- Parameters
src_path (
str
) – remote (provider) source pathon_download (
Callable
[[bytes
],Awaitable
]) – the callable to run on the received datalimit (
int
) – limit of bytes to be downloaded (expected size)
- Return type
Awaitable
[CommandExecuted
]
- download_file(src_path, dst_path)
Schedule downloading a remote file from the provider.
- Parameters
src_path (
str
) – remote (provider) source pathdst_path (
str
) – local (requestor) destination path
- Return type
Awaitable
[CommandExecuted
]
- download_json(src_path, on_download, limit=1048576)
Schedule downloading a remote file from the provider as JSON.
- Parameters
src_path (
str
) – remote (provider) source pathon_download (
Callable
[[Any
],Awaitable
]) – the callable to run on the received datalimit (
int
) – limit of bytes to be downloaded (expected size)
- Return type
Awaitable
[CommandExecuted
]
- property id: int
Return the ID of this
Script
instance.IDs are provided by a global iterator and therefore are guaranteed to be unique during the program’s execution.
- Return type
int
- run(cmd, *args, env=None, stderr=None, stdout=None)
Schedule running a shell command on the provider.
- Parameters
cmd (
str
) – command to run on the provider, e.g. /my/dir/run.shargs (
str
) – command arguments, e.g. “input1.txt”, “output1.txt”env (
Optional
[Dict
[str
,str
]]) – optional dictionary with environment variablesstderr (
Optional
[CaptureContext
]) – capture context to use for stderrstdout (
Optional
[CaptureContext
]) – capture context to use for stdout
- Return type
Awaitable
[CommandExecuted
]
- start(*args)
Schedule a
Start
command on the provider.- Return type
Awaitable
[CommandExecuted
]
- terminate()
Schedule a
Terminate
command on the provider.- Return type
Awaitable
[CommandExecuted
]
- upload_bytes(data, dst_path)
Schedule sending bytes data to the provider.
- Parameters
data (
bytes
) – bytes to senddst_path (
str
) – remote (provider) destination path
- Return type
Awaitable
[CommandExecuted
]
- upload_file(src_path, dst_path)
Schedule sending a file to the provider.
- Parameters
src_path (
str
) – local (requestor) source pathdst_path (
str
) – remote (provider) destination path
- Return type
Awaitable
[CommandExecuted
]
- upload_json(data, dst_path)
Schedule sending JSON data to the provider.
- Parameters
data (
dict
) – dictionary representing JSON data to senddst_path (
str
) – remote (provider) destination path
- Return type
Awaitable
[CommandExecuted
]
Market strategies
- class yapapi.strategy.MarketStrategy
Abstract market strategy.
- async decorate_demand(demand)
Optionally add relevant constraints to a Demand.
- Return type
None
- async score_offer(offer, history=None)
Score offer. Better offers should get higher scores.
- Return type
float
- class yapapi.strategy.DummyMS(max_fixed_price=Decimal('0.05'), max_price_for=mappingproxy({}), activity=None)
A default market strategy implementation.
Its
score_offer()
method returnsSCORE_NEUTRAL
for every offer with prices that do not exceed maximum prices specified for each counter. For other offers, returnsSCORE_REJECTED
.
- class yapapi.strategy.LeastExpensiveLinearPayuMS(expected_time_secs=60, max_fixed_price=Decimal('Infinity'), max_price_for=mappingproxy({}))
A strategy that scores offers according to cost for given computation time.
- class yapapi.strategy.DecreaseScoreForUnconfirmedAgreement(base_strategy, factor)
A market strategy that modifies a base strategy based on history of agreements.
Exceptions
- exception yapapi.NoPaymentAccountError(required_driver, required_network)
The error raised if no payment account for the required driver/network is available.
- exception yapapi.rest.activity.BatchTimeoutError
An exception that indicates that an execution of a batch of commands timed out.
Logging
- class yapapi.log.SummaryLogger(wrapped_emitter=None)
Aggregates information from computation events to provide a high-level summary.
The logger’s
log()
method can be used as event_consumer callback inGolem
initialization. It will aggregate the events generated and output some summary information.The optional wrapped_emitter argument can be used for chaining event emitters: each event logged with
log()
is first passed to wrapped_emitter.For example, with the following setup, each event will be logged by log_event_repr, and additionally, certain events will cause summary messages to be logged.
detailed_logger = log_event_repr summary_logger = SummaryLogger(wrapped_emitter=detailed_logger).log golem = Golem(..., event_consumer=summary_logger)
- log(event)
Register an event.
- Return type
None
- yapapi.log.enable_default_logger(format_='[%(asctime)s %(levelname)s %(name)s] %(message)s', log_file=None, debug_activity_api=False, debug_market_api=False, debug_payment_api=False, debug_net_api=False)
Enable the default logger that logs messages to stderr with level INFO.
If log_file is specified, the logger with output messages with level DEBUG to the given file.
- yapapi.log.log_summary(wrapped_emitter=None)
Output a summary of computation.
This is a utility function that creates a
SummaryLogger
instance wrapping an optional wrapped_emitter and returns itslog()
method.See the documentation of
SummaryLogger
for more information.
Utils
- yapapi.windows_event_loop_fix()
Set up asyncio to use ProactorEventLoop implementation for new event loops on Windows.
This work-around is only needed for Python 3.6 and 3.7. With Python 3.8, ProactorEventLoop is already the default on Windows.
- yapapi.get_version()
- Return type
str
- Returns
the version of the yapapi library package