Golem Python API Reference
Golem
- class yapapi.Golem(*, budget, strategy=None, subnet_tag=None, payment_driver=None, payment_network=None, event_consumer=None, stream_output=False, api_config=None, app_key=None)
The main entrypoint of Golem’s high-level API.
Its principal role is providing an interface to run the requestor’s payload using one of two modes of operation - executing tasks and running services.
The first one, available through
execute_tasks()
, instructsGolem
to take a sequence of tasks that the user wishes to compute on Golem and distributes those among the providers.The second one, invoked with
run_service()
, makesGolem
spawn a certain number of instances of a service based on a single service specification (a specialized implementation inheriting fromService
).While the two modes are not necessarily completely disjoint - in that we can create a service that exists to process a certain number of computations and, similarly, we can use the task model to run some service - the main difference lies in the lifetime of such a job.
Whereas a task-based job exists for the purpose of computing the specific sequence of tasks and is done once the whole sequence has been processed, the service-based job is created for a potentially indefinite period and the services spawned within it are kept alive for as long as they’re needed.
Additionally, the service interface provides a way to easily define handlers for certain, discrete phases of a lifetime of each service instance - startup, running and shutdown.
Internally,
Golem
’s job includes running the engine which takes care of first finding the providers interested in the jobs the requestors want to execute, then negotiating agreements with them and facilitating the execution of those jobs and lastly, processing payments. For this reason, it’s usually good to have just one instance ofGolem
operative at any given time.- __init__(*, budget, strategy=None, subnet_tag=None, payment_driver=None, payment_network=None, event_consumer=None, stream_output=False, api_config=None, app_key=None)
Initialize Golem engine.
- Parameters
budget (
Union
[float
,Decimal
]) – maximum budget for paymentsstrategy (
Optional
[BaseMarketStrategy
]) – market strategy used to select providers from the market (e.g.yapapi.strategy.LeastExpensiveLinearPayuMS
oryapapi.strategy.DummyMS
)subnet_tag (
Optional
[str
]) – use only providers in the subnet with the subnet_tag name. Uses YAGNA_SUBNET environment variable, defaults to Nonepayment_driver (
Optional
[str
]) – name of the payment driver to use. Uses YAGNA_PAYMENT_DRIVER environment variable, defaults to erc20. Only payment platforms with the specified driver will be usedpayment_network (
Optional
[str
]) – name of the network to use. Uses YAGNA_PAYMENT_NETWORK environment variable, defaults to goerli. Only payment platforms with the specified network will be usedevent_consumer (
Optional
[Callable
[[Event
],None
]]) – a callable that processes events related to the computation; by default it is a function that logs all eventsstream_output (
bool
) – stream computation output from providersapi_config (
Optional
[ApiConfig
]) – configuration of yagna low level api including but not limited to YAGNA_APPKEY, YAGNA_API_URL variables See :class:`yapapi.config.ApiConfig` docs for more detailsapp_key (
Optional
[str
]) – optional Yagna application key. If not provided, the default is to get the value from YAGNA_APPKEY environment variable
- add_event_consumer(event_consumer, event_classes_or_names=(<class 'yapapi.events.Event'>, ))
Initialize another event_consumer, working just like the event_consumer passed to
Golem.__init__()
.- Parameters
event_consumer (
Callable
[[Event
],None
]) – A callable that will be executed on every event.event_classes_or_names (
Iterable
[Union
[Type
[Event
],str
]]) – An iterable defining classes of events that should be passed to this event_consumer. Both classes and class names are accepted (in the latter case classes must be available in the yapapi.events namespace). If this argument is omitted, all events inheriting fromyapapi.events.Event
(i.e. all currently implemented events) will be passed to the event_consumer.
Example usages:
def event_consumer(event: "yapapi.events.Event"): print(f"Got an event! {type(event).__name__}") golem.add_event_consumer(event_consumer)
def event_consumer(event: "yapapi.events.AgreementConfirmed"): provider_name = event.agreement.details.provider_node_info.name print(f"We're trading with {provider_name}! Nice!") golem.add_event_consumer(event_consumer, ["AgreementConfirmed"])
- async start()
Start the Golem engine in non-contextmanager mode.
The default way of using Golem:
async with Golem(...) as golem: # ... work with golem
Is roughly equivalent to:
golem = Golem(...) try: await golem.start() # ... work with golem finally: await golem.stop()
- A repeated call to
Golem.start()
: - If Golem is already starting, or started and wasn’t stopped - will be ignored
(and harmless)
- If Golem was stopped - will initialize a new engine that knows nothing about the
previous operations
- Return type
None
- A repeated call to
- async stop(wait_for_payments=True)
Stop the Golem engine after it was started in non-contextmanager mode.
Details:
Golem.start()
- Return type
None
- async execute_tasks(worker, data, payload, max_workers=None, timeout=None, job_id=None, implicit_init=True)
Submit a sequence of tasks to be executed on providers.
Internally, this method creates an instance of
yapapi.executor.Executor
and calls itssubmit()
method with given worker function and sequence of tasks.- Parameters
worker (
Callable
[[WorkContext
,AsyncIterator
[Task
[TypeVar
(D
),TypeVar
(R
)]]],AsyncGenerator
[Script
,Awaitable
[List
[CommandEvent
]]]]) – an async generator that takes aWorkContext
object and a sequence of tasks, and generates as sequence of scripts to be executed on providers in order to compute given tasksdata (
Union
[AsyncIterator
[Task
[TypeVar
(D
),TypeVar
(R
)]],Iterable
[Task
[TypeVar
(D
),TypeVar
(R
)]]]) – an iterable or an async generator ofTask
objects to be computed on providerspayload (
Payload
) – specification of the payload that needs to be deployed on providers (for example, a VM runtime package) in order to compute the tasks, passed to the createdExecutor
instancemax_workers (
Optional
[int
]) – maximum number of concurrent workers, passed to theExecutor
instancetimeout (
Optional
[timedelta
]) – timeout for computing all tasks, passed to theExecutor
instancejob_id (
Optional
[str
]) – an optional string to identify the job created by this method. Passed as the value of the id parameter toyapapi.engine.Job
.implicit_init (
bool
) – True ->deploy()
andstart()
will be called internally by theExecutor
. False -> those calls must be in the worker function
- Return type
AsyncIterator
[Task
[TypeVar
(D
),TypeVar
(R
)]]- Returns
an async iterator that yields completed Task objects
example usage:
async def worker(context: WorkContext, tasks: AsyncIterable[Task]): async for task in tasks: script = context.new_script() future_result = script.run("/bin/sh", "-c", "date") yield script task.accept_result(result=await future_result) package = await vm.repo( image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376", ) async with Golem(budget=1.0, subnet_tag="public") as golem: async for completed in golem.execute_tasks(worker, [Task(data=None)], payload=package): print(completed.result.stdout)
- async run_service(service_class, num_instances=None, instance_params=None, payload=None, expiration=None, network=None, network_addresses=None)
Run a number of instances of a service represented by a given
Service
subclass.- Parameters
service_class (
Type
[TypeVar
(ServiceType
, bound=Service
)]) – a subclass ofService
that represents the service to be runnum_instances (
Optional
[int
]) – optional number of service instances to run. Defaults to a single instance, unless instance_params is given, in which case, theCluster
will be created with as many instances as there are elements in the instance_params iterable. if num_instances is set to < 1, theCluster
will still be created but no instances will be spawned within it.instance_params (
Optional
[Iterable
[Dict
]]) – optional list of dictionaries of keyword arguments that will be passed to consecutive, spawned instances. The number of elements in the iterable determines the number of instances spawned, unless num_instances is given, in which case the latter takes precedence. In other words, if both num_instances and instance_params are provided, theCluster
will be created with the number of instances determined by num_instances and if there are too few elements in the instance_params iterable, it will results in an error.payload (
Optional
[Payload
]) – optional runtime definition for the service; if not provided, the payload specified by theget_payload()
method of service_class is usedexpiration (
Optional
[datetime
]) – optional expiration datetime for the servicenetwork (
Optional
[Network
]) – optionalNetwork
, representing a VPN to attach thisCluster
’s instances tonetwork_addresses (
Optional
[List
[str
]]) – optional list of addresses to assign to consecutive spawned instances. If there are too few addresses given in the network_addresses iterable to satisfy all spawned instances, the rest (or all when the list is empty or not provided at all) of the addresses will be assigned automatically. Requires the network argument to be provided at the same time.
example usage:
DATE_OUTPUT_PATH = "/golem/work/date.txt" REFRESH_INTERVAL_SEC = 5 class DateService(Service): @staticmethod async def get_payload(): return await vm.repo( image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376", ) async def start(self): async for script in super().start(): yield script # every `DATE_POLL_INTERVAL` write output of `date` to `DATE_OUTPUT_PATH` script = self._ctx.new_script() script.run( "/bin/sh", "-c", f"while true; do date > {DATE_OUTPUT_PATH}; sleep {REFRESH_INTERVAL_SEC}; done &", ) yield script async def run(self): while True: await asyncio.sleep(REFRESH_INTERVAL_SEC) script = self._ctx.new_script() future_result = script.run( "/bin/sh", "-c", f"cat {DATE_OUTPUT_PATH}", ) yield script result = (await future_result).stdout print(result.strip() if result else "") async def main(): async with Golem(budget=1.0, subnet_tag="public") as golem: cluster = await golem.run_service(DateService, num_instances=1) start_time = datetime.now() while datetime.now() < start_time + timedelta(minutes=1): for num, instance in enumerate(cluster.instances): print(f"Instance {num} is {instance.state.value} on {instance.provider_name}") await asyncio.sleep(REFRESH_INTERVAL_SEC)
- async create_network(ip, owner_ip=None, mask=None, gateway=None)
Create a VPN inside Golem network.
Requires yagna >= 0.8
- Parameters
ip (
str
) – the IP address of the network. May contain netmask, e.g. “192.168.0.0/24”owner_ip (
Optional
[str
]) – the desired IP address of the requestor node within the newly-created Networkmask (
Optional
[str
]) – Optional netmask (only if not provided within the ip argument)gateway (
Optional
[str
]) – Optional gateway address for the network
- Return type
Task API
Task
- class yapapi.Task(data)
One computation unit.
Represents one computation unit that will be run on the provider (e.g. rendering of one frame of an animation).
- __init__(data)
Create a new
Task
object.- Parameters
data (
TypeVar
(TaskData
)) – contains information needed to prepare command list for the provider
- property running_time: Optional[datetime.timedelta]
Return the running time of the task (if in progress) or time it took to complete it.
- Return type
Optional
[timedelta
]
- accept_result(result=None)
Accept the result of this task.
Must be called when the result is correct to mark this task as completed.
- Parameters
result (
Optional
[TypeVar
(TaskResult
)]) – task computation result (optional)- Return type
None
- reject_result(reason=None, retry=False)
Reject the result of this task.
Must be called when the result is not correct to indicate that the task should be retried.
- Parameters
reason (
Optional
[str
]) – task rejection description (optional)- Return type
None
Service API
Service
- class yapapi.services.Service(_id=None)
Base class for service specifications.
To be extended by application developers to define their own, specialized Service specifications.
- property id: str
Return the unique id of this service instance.
- Return type
str
- property provider_name: Optional[str]
Return the name of the provider that runs this service instance.
- Return type
Optional
[str
]
- property network: Optional[yapapi.network.Network]
Return the
Network
to which this instance belongs (if any).- Return type
Optional
[Network
]
- property network_node: Optional[yapapi.network.Node]
Return the network
Node
record associated with this instance.- Return type
Optional
[Node
]
- async send_message(message=None)
Send a control message to this instance.
- send_message_nowait(message=None)
Send a control message to this instance without blocking.
May raise
asyncio.QueueFull
if the channel for sending control messages is full.
- async receive_message()
Wait for a control message sent to this instance.
- Return type
ServiceSignal
- receive_message_nowait()
Retrieve a control message sent to this instance.
Return None if no message is available.
- Return type
Optional
[ServiceSignal
]
- async static get_payload()
Return the payload (runtime) definition for this service.
To be overridden by the author of a specific
Service
class.If
get_payload()
is not implemented, the payload will need to be provided in therun_service()
call.- Return type
Optional
[Payload
]
- async start()
Implement the handler for the starting state of the service.
To be overridden by the author of a specific
Service
class.Should perform the minimum set of operations after which the instance of a service can be treated as “started”, or, in other words, ready to receive service requests. It’s up to the developer of the specific
Service
class to decide what exact operations constitute a service startup. In the most common scenariodeploy()
andstart()
are required, check the Default implementation section for more details.As a handler implementing the work generator pattern, it’s expected to be a generator that yields
Script
(generated using the service’s instance of theWorkContext
-self._ctx
) that are then dispatched to the activity by the engine.Results of those batches can then be retrieved by awaiting the values captured from yield statements.
A clean exit from a handler function triggers the engine to transition the state of the instance to the next stage in service’s lifecycle - in this case, to running.
On the other hand, any unhandled exception will potentially trigger a restart of an instance on another provider node. This is behavior is controlled by Service’s restart_condition property and by default restarts instances only if they encountered an error and had not been successfully started before.
To change this default, override the restart_condition.
Example:
async def start(self): s = self._ctx.new_script() # deploy the exe-unit s.deploy(**self.get_deploy_args()) # start the exe-unit's container s.start() # start some service process within the container s.run("/golem/run/service_ctl", "--start") # send the batch to the provider yield s
### Default implementation
The default implementation assumes that, in order to accept commands, the runtime needs to be first deployed using the
deploy()
command, which is analogous to creation of a container corresponding with the desired payload, and then started using thestart()
command, actually launching the process that runs the aforementioned container.Additionally, it also assumes that the exe-unit doesn’t need any additional parameters in its
start()
call (e.g. for the VM runtime, all the required parameters are already passed as part of the agreement between the requestor and the provider), and parameters passed todeploy()
are returned byService.get_deploy_args()
method.Therefore, this default implementation performs the minimum required for a VM payload to start responding to run commands. If your service requires any additional operations - you’ll need to override this method (possibly first yielding from the parent - super().start() - generator) to add appropriate preparatory steps.
In case of runtimes other than VM, deploy and/or start might be optional or altogether disallowed, or they may take some parameters. It is up to the author of the specific Service implementation that uses such a payload to adjust this method accordingly based on the requirements for the given runtime/exe-unit type.
- Return type
AsyncGenerator
[Script
,Awaitable
[List
[CommandEvent
]]]
- async run()
Implement the handler for the running state of the service.
To be overridden by the author of a specific
Service
class.Should contain any operations needed to ensure continuous operation of a service.
As a handler implementing the work generator pattern, it’s expected to be a generator that yields
Script
(generated using the service’s instance of theWorkContext
-self._ctx
) that are then dispatched to the activity by the engine.Results of those batches can then be retrieved by awaiting the values captured from yield statements.
A clean exit from a handler function triggers the engine to transition the state of the instance to the next stage in service’s lifecycle - in this case, to stopping.
Any unhandled exception will cause the instance to be terminated.
Example:
async def run(self): while True: script = self._ctx.new_script() stats_results = script.run(self.SIMPLE_SERVICE, "--stats") yield script stats = (await stats_results).stdout.strip() print(f"stats: {stats}")
Default implementation
Because the nature of the operations required during the “running” state depends directly on the specifics of a given
Service
and because it’s entirely plausible for a service not to require any direct interaction with the exe-unit (runtime) from the requestor’s end after the service has been started, the default is to just wait indefinitely without producing any batches.- Return type
AsyncGenerator
[Script
,Awaitable
[List
[CommandEvent
]]]
- async shutdown()
Implement the handler for the stopping state of the service.
To be overridden by the author of a specific
Service
class.Should contain any operations that the requestor needs to ensure the instance is correctly and gracefully shut-down - e.g. that its final state is retrieved.
As a handler implementing the work generator pattern, it’s expected to be a generator that yields
Script
(generated using the service’s instance of theWorkContext
-self._ctx
) that are then dispatched to the activity by the engine.Results of those batches can then be retrieved by awaiting the values captured from yield statements.
Finishing the execution of this handler will trigger termination of this instance.
This handler will only be called if the activity running the service is still available. If the activity has already been deemed terminated or if the connection with the provider has been lost, the service will transition to the terminated state and the shutdown handler won’t be run.
Example:
async def shutdown(self): script = self._ctx.new_script() script.run("/golem/run/dump_state") script.download_file("/golem/output/state", "/some/local/path/state") script.terminate() yield script
Default implementation
By default, the activity is just sent a terminate command. Whether it’s absolutely required or not, again, depends on the implementation of the given runtime.
- Return type
AsyncGenerator
[Script
,Awaitable
[List
[CommandEvent
]]]
- async reset()
Reset the service to the initial state. The default implementation does nothing here.
This method is called internally when the service is restarted in
yapapi.services.ServiceRunner
, so it is not necessary for services that are never restarted (note thatrun_service()
by default restarts services that didn’t start properly based onService.restart_condition()
).Handlers of a restarted service are called more then once - all of the cleanup necessary between calls should be implemented here. E.g. if we initialize a counter in
Service.__init__()
and increment it inService.start()
, we might want to reset it here to the initial value.- Return type
None
- property restart_condition: bool
Return the condition, based on which
yapapi.services.ServiceRunner
decides if it should restart this instance.If restart_condition returns True, the service is restarted. In such case, before putting the service back into the pending state, the ServiceRunner calls this service’s reset method.
- Return type
bool
- property is_available
Return True if this instance is available (that is, starting, running or stopping).
- property state
Return the current state of this instance.
- async is_activity_responsive()
Verify if the provider’s activity is responsive.
Tries to get the state activity. Returns True if the activity state could be queried successfully and false otherwise.
Can be overridden in case the specific implementation of Service wants to implement a more appropriate health check of the particular service.
- Return type
bool
Cluster
- class yapapi.services.Cluster(engine, service_class, payload, expiration=None, network=None)
Golem’s sub-engine used to spawn and control instances of a single
Service
.- property expiration: datetime.datetime
Return the expiration datetime for agreements related to services in this
Cluster
.- Return type
datetime
- property payload: yapapi.payload.Payload
Return the service runtime definition for this
Cluster
.- Return type
- property service_class: Type[yapapi.services.service.ServiceType]
Return the class instantiated by all service instances in this
Cluster
.- Return type
Type
[TypeVar
(ServiceType
, bound=Service
)]
- property network: Optional[yapapi.network.Network]
Return the
Network
record associated with the VPN used by thisCluster
.- Return type
Optional
[Network
]
- property instances: List[yapapi.services.service.ServiceType]
Return the list of service instances in this
Cluster
.- Return type
List
[TypeVar
(ServiceType
, bound=Service
)]
- spawn_instances(num_instances=None, instance_params=None, network_addresses=None)
Spawn new instances within this
Cluster
.- Parameters
num_instances (
Optional
[int
]) – optional number of service instances to run. Defaults to a single instance, unless instance_params is given, in which case, theCluster
will spawn as many instances as there are elements in the instance_params iterable. if num_instances is not None and < 1, the method will immediately return and log a warning.instance_params (
Optional
[Iterable
[Dict
]]) – optional list of dictionaries of keyword arguments that will be passed to the __init__ of the consecutive, spawned instances. The number of elements in the iterable determines the number of instances spawned, unless num_instances is given, in which case the latter takes precedence. In other words, if both num_instances and instance_params are provided, the number of instances spawned will be equal to num_instances and if there are too few elements in the instance_params iterable, it will result in an error.network_addresses (
Optional
[List
[str
]]) – optional list of network addresses in case theCluster
is attached to VPN. If the list is not provided (or if there are fewer elements than the number of spawned instances), any instances for which the addresses have not been given, will be assigned an address automatically.
- Return type
None
ServiceState
- class yapapi.services.ServiceState(model=None, state_field='state', start_value=None)
State machine describing the state and lifecycle of a
Service
instance.- pending = State('pending', identifier='pending', value='pending', initial=True)
The service instance has not yet been assigned to a provider.
- starting = State('starting', identifier='starting', value='starting', initial=False)
The service instance is starting on a provider.
The activity within which the service is running has been created on a provider node and now the service instance’s
start()
handler is active and has not yet finished.
- running = State('running', identifier='running', value='running', initial=False)
The service instance is running on a provider.
The instance’s
start()
handler has finished and therun()
handler is active.
- stopping = State('stopping', identifier='stopping', value='stopping', initial=False)
The service instance is stopping on a provider.
The instance’s
run()
handler has finished and theshutdown()
handler is active.
- terminated = State('terminated', identifier='terminated', value='terminated', initial=False)
The service instance has been terminated and is no longer bound to an activity.
This means that either the service has been explicitly stopped by the requestor, or the activity that the service had been attached-to has been terminated - e.g. by a failure on the provider’s end or as a result of termination of the agreement between the requestor and the provider.
- AVAILABLE = (State('starting', identifier='starting', value='starting', initial=False), State('running', identifier='running', value='running', initial=False), State('stopping', identifier='stopping', value='stopping', initial=False))
A helper set of states in which the service instance is bound to an activity and can be interacted with.
Network API
Network
- class yapapi.network.Network(net_api, ip, owner_id, owner_ip=None, mask=None, gateway=None)
Describes a VPN created between the requestor and the provider nodes within Golem Network.
- async classmethod create(net_api, ip, owner_id, owner_ip=None, mask=None, gateway=None)
Create a new VPN.
- Parameters
net_api (
Net
) – the mid-level binding used directly to perform calls to the REST API.ip (
str
) – the IP address of the network. May contain netmask, e.g. “192.168.0.0/24”owner_id (
str
) – the node ID of the owner of this VPN (the requestor)owner_ip (
Optional
[str
]) – the desired IP address of the requestor node within the newly-created networkmask (
Optional
[str
]) – Optional netmask (only if not provided within the ip argument)gateway (
Optional
[str
]) – Optional gateway address for the network
- Return type
- property owner_ip: str
Return the IP address of the requestor node within the network.
- Return type
str
- property network_address: str
Return the network address of this network, without a netmask.
- Return type
str
- property netmask: str
Return the netmask of this network.
- Return type
str
- property gateway: Optional[str]
Return the gateway address within this network, if provided.
- Return type
Optional
[str
]
- property nodes_dict: Dict[str, str]
Return mapping between the IP addresses and Node IDs of the nodes within this network.
- Return type
Dict
[str
,str
]
- property network_id: str
Return the automatically-generated, unique ID of this VPN.
- Return type
str
- async add_owner_address(ip)
Assign the given IP address to the requestor in the network.
- Parameters
ip (
str
) – the IP address to assign to the requestor node.
Node
- class yapapi.network.Node(network, node_id, ip)
Describes a node in a VPN, mapping a Golem node id to an IP address.
- network: yapapi.network.Network
The
Network
(the specific VPN) this node is part of.
- node_id: str
Golem id of the node.
- ip: str
IP address of this node in this particular VPN.
- get_deploy_args()
Generate a dictionary of arguments that are required for the appropriate Deploy command of an exescript in order to pass the network configuration to the runtime on the provider’s end.
- Return type
Dict
Exceptions
Payload definition
Payload
- class yapapi.payload.Payload(**kwargs)
Base class for descriptions of the payload required by the requestor.
example usage:
import asyncio from dataclasses import dataclass from yapapi.props.builder import DemandBuilder from yapapi.props.base import prop, constraint from yapapi.props import inf from yapapi.payload import Payload CUSTOM_RUNTIME_NAME = "my-runtime" CUSTOM_PROPERTY = "golem.srv.app.myprop" @dataclass class MyPayload(Payload): myprop: str = prop(CUSTOM_PROPERTY, default="myvalue") runtime: str = constraint(inf.INF_RUNTIME_NAME, default=CUSTOM_RUNTIME_NAME) min_mem_gib: float = constraint(inf.INF_MEM, operator=">=", default=16) min_storage_gib: float = constraint(inf.INF_STORAGE, operator=">=", default=1024) async def main(): builder = DemandBuilder() payload = MyPayload(myprop="othervalue", min_mem_gib=32) await builder.decorate(payload) print(builder) asyncio.run(main())
output:
{'properties': {'golem.srv.app.myprop': 'othervalue'}, 'constraints': ['(&(golem.runtime.name=my-runtime)\n\t(golem.inf.mem.gib>=32)\n\t(golem.inf.storage.gib>=1024))']}
Package
- class yapapi.payload.package.Package
Description of a task package (e.g. a VM image) deployed on the provider nodes.
vm
- async yapapi.payload.vm.manifest(manifest, manifest_sig=None, manifest_sig_algorithm=None, manifest_cert=None, min_mem_gib=0.5, min_storage_gib=2.0, min_cpu_threads=1, capabilities=None)
Build a reference to application payload.
- Parameters
manifest (
str
) – base64 encoded Computation Payload Manifest https://handbook.golem.network/requestor-tutorials/vm-runtime/computation-payload-manifestmanifest_sig (
Optional
[str
]) – an optional signature of base64 encoded Computation Payload Manifestmanifest_sig_algorithm (
Optional
[str
]) – an optional signature algorithm, e.g. “sha256”manifest_cert (
Optional
[str
]) – an optional base64 encoded public certificate (DER or PEM) matching key used to generate signaturemin_mem_gib (
float
) – minimal memory required to execute application codemin_storage_gib (
float
) – minimal disk storage to execute tasksmin_cpu_threads (
int
) – minimal available logical CPU corescapabilities (
Optional
[List
[Literal
[‘vpn’, ‘inet’, ‘manifest-support’]]]) – an optional list of required VM capabilities
- Return type
- Returns
the payload definition for the given VM image
example usage:
package = await vm.manifest( manifest = open("manifest.json.base64", "r").read(), )
example usage with a signed Computation Pyload Manifest and additional “inet” capability:
package = await vm.manifest( manifest = open("manifest.json.base64", "r").read(), manifest_sig = open("manifest.json.sig.base64", "r").read(), manifest_sig_algorithm = "sha256", manifest_cert = open("cert.der.base64", "r").read(), capabilities = ["manifest-support", "inet"], )
- async yapapi.payload.vm.repo(*, image_hash=None, image_tag=None, image_url=None, image_use_https=False, repository_url='https://registry.golem.network', dev_mode=False, min_mem_gib=0.5, min_storage_gib=2.0, min_cpu_threads=1, capabilities=None)
Build a reference to application package.
- Parameters
image_hash (
Optional
[str
]) – hash of the package’s imageimage_tag (
Optional
[str
]) – Tag of the package to resolve from Golem Registryimage_url (
Optional
[str
]) – URL of the package’s imageimage_use_https (
bool
) – whether to resolve to HTTPS or HTTP when using Golem Registryrepository_url (
str
) – override the package repository locationmin_mem_gib (
float
) – minimal memory required to execute application codemin_storage_gib (
float
) – minimal disk storage to execute tasksmin_cpu_threads (
int
) – minimal available logical CPU corescapabilities (
Optional
[List
[Literal
[‘vpn’, ‘inet’, ‘manifest-support’]]]) – an optional list of required VM capabilities
- Return type
- Returns
the payload definition for the given VM image
example usage:
package = await vm.repo( # if we provide only the image hash, the image will be # automatically pulled from Golem's image repository image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376", )
example usage with an explicit GVMI image URL (useful to host images outside the Golem repository):
package = await vm.repo( # we still need to provide the image's hash because # the image's integrity is validated by the runtime on the provider node # # the hash can be calculated by running `sha3sum -a 224 <image_filename.gvmi>` # image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376", # the URL can point to any publicly-available location on the web image_url="http://girepo.dev.golem.network:8000/docker-golem-hello-world-latest-779758b432.gvmi", )
example usage with additional constraints:
package = await vm.repo( image_hash="9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae", # only run on provider nodes that have more than 0.5gb of RAM available min_mem_gib=0.5, # only run on provider nodes that have more than 2gb of storage space available min_storage_gib=2.0, # only run on provider nodes with a certain number of CPU threads available min_cpu_threads=min_cpu_threads, )
Execution control
WorkContext
- class yapapi.WorkContext(activity, agreement, storage, emitter)
Provider node’s work context.
Used to schedule commands to be sent to the provider and enable other interactions between the requestor agent’s client code and the activity on provider’s end.
- property id: str
Return unique identifier for this work context.
- Return type
str
- property provider_name: Optional[str]
Return the name of the provider associated with this work context.
- Return type
Optional
[str
]
- property provider_id: str
Return the id of the provider associated with this work context.
- Return type
str
- new_script(timeout=None, wait_for_results=True)
Create an instance of
Script
attached to thisWorkContext
instance.This is equivalent to calling Script(work_context). This method is intended to provide a direct link between the two object instances.
- Return type
- async get_raw_usage()
Get the raw usage vector for the activity bound to this work context.
The value comes directly from the low level API and is not interpreted in any way.
- Return type
ActivityUsage
- async get_usage()
Get the current usage for the activity bound to this work context.
- Return type
ActivityUsage
- async get_raw_state()
Get the state of the activity bound to this work context.
The value comes directly from the low level API and is not interpreted in any way.
- Return type
ActivityState
- async get_cost()
Get the accumulated cost of the activity based on the reported usage.
- Return type
Optional
[float
]
Script
- class yapapi.script.Script(context, timeout=None, wait_for_results=True)
Represents a series of commands to be executed on a provider node.
New commands are added to the script either through its
add()
method or by calling one of the convenience methods provided (for example:run()
orupload_json()
). Adding a new command does not result in it being immediately executed. Once ready, aScript
instance is meant to be yielded from a worker function (work generator pattern). Commands will be run in the order in which they were added to the script.- __init__(context, timeout=None, wait_for_results=True)
Initialize a
Script
.- Parameters
context (
WorkContext
) – Ayapapi.WorkContext
that will be used to evaluate the script (i.e. to send commands to the provider)timeout (
Optional
[timedelta
]) – Time after which this script’s execution should be forcefully interrupted. The default value is None which means there’s no timeout set.wait_for_results (
bool
) – Whether this script’s execution should block until its results are available. The default value is True.
- property id: int
Return the ID of this
Script
instance.IDs are provided by a global iterator and therefore are guaranteed to be unique during the program’s execution.
- Return type
int
- add(cmd)
Add a
yapapi.script.command.Command
to theScript
.- Return type
Awaitable
[CommandExecuted
]
- deploy(**kwargs)
Schedule a
Deploy
command on the provider.- Return type
Awaitable
[CommandExecuted
]
- start(*args)
Schedule a
Start
command on the provider.- Return type
Awaitable
[CommandExecuted
]
- terminate()
Schedule a
Terminate
command on the provider.- Return type
Awaitable
[CommandExecuted
]
- run(cmd, *args, env=None, stderr=None, stdout=None)
Schedule running a shell command on the provider.
- Parameters
cmd (
str
) – command to run on the provider, e.g. /my/dir/run.shargs (
str
) – command arguments, e.g. “input1.txt”, “output1.txt”env (
Optional
[Dict
[str
,str
]]) – optional dictionary with environment variablesstderr (
Optional
[CaptureContext
]) – capture context to use for stderrstdout (
Optional
[CaptureContext
]) – capture context to use for stdout
- Return type
Awaitable
[CommandExecuted
]
- download_bytes(src_path, on_download, limit=1048576)
Schedule downloading a remote file from the provider as bytes.
- Parameters
src_path (
str
) – remote (provider) source pathon_download (
Callable
[[bytes
],Awaitable
]) – the callable to run on the received datalimit (
int
) – limit of bytes to be downloaded (expected size)
- Return type
Awaitable
[CommandExecuted
]
- download_file(src_path, dst_path)
Schedule downloading a remote file from the provider.
- Parameters
src_path (
str
) – remote (provider) source pathdst_path (
str
) – local (requestor) destination path
- Return type
Awaitable
[CommandExecuted
]
- download_json(src_path, on_download, limit=1048576)
Schedule downloading a remote file from the provider as JSON.
- Parameters
src_path (
str
) – remote (provider) source pathon_download (
Callable
[[Any
],Awaitable
]) – the callable to run on the received datalimit (
int
) – limit of bytes to be downloaded (expected size)
- Return type
Awaitable
[CommandExecuted
]
- upload_bytes(data, dst_path)
Schedule sending bytes data to the provider.
- Parameters
data (
bytes
) – bytes to senddst_path (
str
) – remote (provider) destination path
- Return type
Awaitable
[CommandExecuted
]
- upload_file(src_path, dst_path)
Schedule sending a file to the provider.
- Parameters
src_path (
str
) – local (requestor) source pathdst_path (
str
) – remote (provider) destination path
- Return type
Awaitable
[CommandExecuted
]
- upload_json(data, dst_path)
Schedule sending JSON data to the provider.
- Parameters
data (
dict
) – dictionary representing JSON data to senddst_path (
str
) – remote (provider) destination path
- Return type
Awaitable
[CommandExecuted
]
Market strategies
- class yapapi.strategy.MarketStrategy
Abstract market strategy.
- acceptable_prop_value_range_overrides: Dict[str, yapapi.strategy.base.PropValueRange]
Optional overrides to the acceptable property value ranges.
- property acceptable_prop_value_ranges: Dict[str, yapapi.strategy.base.PropValueRange]
Return the range of acceptable property values for negotiable properties.
- Return type
Dict
[str
,PropValueRange
]
- async respond_to_provider_offer(our_demand, provider_offer)
Respond to the provider’s OfferProposal with acceptable values for negotiable properties.
Includes negotiation of the properties required for mid-agreement payments.
- Return type
DemandBuilder
- async decorate_demand(demand)
Optionally add relevant constraints to a Demand.
- Return type
None
- async invoice_accepted_amount(invoice)
Accept full invoice amount.
- Return type
Decimal
- abstract async score_offer(offer)
Score offer. Better offers should get higher scores.
- Return type
float
- async debit_note_accepted_amount(debit_note)
Accept full debit note amount.
- Return type
Decimal
- class yapapi.strategy.WrappingMarketStrategy(base_strategy)
Helper abstract class which allows other/user defined strategies to wrap some other strategies, without overriding the attributes (e.g. defaults) defined on the derived-from strategy.
WrappingMarketStrategy classes are unusable on their own and always have to wrap some base strategy.
By default, all attributes and method calls are forwarded to the base_strategy.
- __init__(base_strategy)
Initialize instance.
- Parameters
base_strategy (
BaseMarketStrategy
) – the base strategy around which this strategy is wrapped.
- base_strategy: yapapi.strategy.base.BaseMarketStrategy
base strategy wrapped by this wrapper.
- class yapapi.strategy.LeastExpensiveLinearPayuMS(expected_time_secs=60, max_fixed_price=Decimal('Infinity'), max_price_for=mappingproxy({}))
A strategy that scores offers according to cost for given computation time.
- async score_offer(offer)
Score offer according to cost for expected computation time.
- Return type
float
- class yapapi.strategy.DecreaseScoreForUnconfirmedAgreement(base_strategy, factor)
A market strategy wrapper that modifies scoring of providers based on history of agreements.
It decreases the scores for providers that rejected our agreements.
The strategy keeps an internal list of providers who have rejected proposed agreements and removes them from this list when they accept one.
- on_event(event)
Modify the internal _rejecting_providers list on AgreementConfirmed/Rejected.
- This method needs to be added as an event consumer in
- Return type
None
- class yapapi.strategy.PropValueRange(min=None, max=None)
Range definition for a negotiable property.
Used in
yapapi.strategy.MarketStrategy.acceptable_prop_value_ranges()
- __contains__(item)
Check if the value fits inside the range.
When max is None, any value above min fits. Whem min is None, any value below max fits. When both min and max are None, any value fits.
- Return type
bool
- clamp(item)
Return a value closest to the given one, within the acceptable range.
- Parameters
item (
float
) – the value to clamp- Return type
float
- Returns
clamped value
In case a range is defined with max < min (effectively making it an empty range), clamp raises a ValueError.
- __init__(min=None, max=None)
- class yapapi.strategy.DummyMS(max_fixed_price=Decimal('0.05'), max_price_for=mappingproxy({}), activity=None)
A default market strategy implementation.
[ DEPRECATED, use LeastExpensiveLinearPayuMS instead ]
Its
score_offer()
method returnsSCORE_NEUTRAL
for every offer with prices that do not exceed maximum prices specified for each counter. For other offers, returnsSCORE_REJECTED
.
Events
Objects representing events in a Golem computation.
Everytime something important happens, an event is emitted.
Emitted events are passed to all current event consumers, set
either in yapapi.Golem.__init__()
or via yapapi.Golem.add_event_consumer()
.
Events are considered a semi-experimental feature:
The backward compatibility of subsequent future updates is not guaranteed, the interface might change between major releases without prior deprecation.
Some parts are not documented and should not be considered a public interface.
Every event is described by a set of attributes that can be divided into three groups:
Attributes common to all events, documented on
yapapi.events.Event
.Internal parts of yapapi that define the context of the event (details in the next section).
Additional event-specific information, e.g. the reason of the agreement termination for the
AgreementTerminated
event, described along the particular event classes.
Events should be consumed in a strict read_only mode: event objects are shared between all event consumers, and their attributes are used internally by the Golem engine, so any modification may have unexpected side effects.
Events inheritance tree
Only leaf events are ever emitted, other events (named
*Event
) are abstract classesEvery abstract class has one more yapapi object attached then the parent, e.g.
JobEvent
is anEvent
that happened in the context of a particularjob
AgreementEvent
is aJobEvent
that happened in the context of a particularagreement
Event
JobEvent
SubscriptionFailed
SubscriptionEvent
SubscriptionCreated
CollectFailed
ProposalEvent
ProposalReceived
ProposalRejected
ProposalResponded
ProposalConfirmed
ProposalFailed
NoProposalsConfirmed
JobStarted
JobFinished
AgreementEvent
AgreementCreated
AgreementConfirmed
AgreementRejected
AgreementTerminated
ActivityCreateFailed
WorkerStarted
ActivityEvent
ActivityCreated
TaskEvent
TaskStarted
TaskFinished
TaskAccepted
TaskRejected
ServiceEvent
ServiceStateChanged
ServiceFinished
ScriptEvent
ScriptSent
CommandEvent
CommandStarted
CommandStdOut
CommandStdErr
CommandExecuted
DownloadStarted
DownloadFinished
GettingResults
ScriptFinished
WorkerFinished
InvoiceEvent
InvoiceReceived
InvoiceAccepted
DebitNoteEvent
DebitNoteReceived
DebitNoteAccepted
PaymentFailed
ExecutionInterrupted
ShutdownFinished
Custom events
Apart from consuming events emitted by the Python API’s internal components, application authors may wish to define their own, custom event classes. That way, Golem’s event system can be easily extended with additional, custom triggers while keeping the consumer logic uniform for both Golem’s own events and the custom ones.
Additionally, such custom events may be used to create pieces of useful, reusable components while keeping the the emitting and consuming logic separate at the same time. As an example, one could provide a custom MarketStrategy that consumes an event such as e.g.: ActivityEvaluated and have many different applications utilize this strategy but evaluate activities in different ways.
Example usage
Declare the class:
from yapapi.events import ActivityEvent
import attr
@attr.s(auto_attribs=True, repr=False)
class ActivityEvaluated(ActivityEvent):
activity_score: float
Emit the event e.g. in the worker()
function of the Task API:
async def worker(ctx: WorkContext, tasks):
... # whatever
activity_score = await score_activity(ctx) # Some app-specific logic
ctx.emit(ActivityEvaluated, activity_score=activity_score)
And consume the event anywhere you want (e.g. in a MarketStrategy method):
def event_consumer(event):
if isinstance(event, ActivityEvaluated):
if event.activity_score < 7:
print(f"Oh no! Activity {event.activity.id} is scored below 7!")
golem.add_event_consumer(event_consumer)
List of event classes
- class yapapi.events.Event(*, exc_info=None)
An abstract base class for all types of events.
- exc_info: Optional[Tuple[Type[BaseException], BaseException, Optional[types.TracebackType]]]
Tuple containing exception info as returned by sys.exc_info(), if applicable.
- timestamp: datetime.datetime
Event creation time
- property exception: Optional[BaseException]
Exception associated with this event or None.
- Return type
Optional
[BaseException
]
- class yapapi.events.JobEvent(job, *, exc_info=None)
- class yapapi.events.SubscriptionEvent(job, subscription, *, exc_info=None)
- class yapapi.events.ProposalEvent(job, proposal, *, exc_info=None)
- class yapapi.events.AgreementEvent(job, agreement, *, exc_info=None)
- class yapapi.events.ActivityEvent(job, agreement, activity, *, exc_info=None)
- class yapapi.events.TaskEvent(job, agreement, activity, task, *, exc_info=None)
- class yapapi.events.ServiceEvent(job, agreement, activity, service, *, exc_info=None)
- class yapapi.events.ScriptEvent(job, agreement, activity, script, *, exc_info=None)
- class yapapi.events.CommandEvent(job, agreement, activity, script, command, *, exc_info=None)
- class yapapi.events.InvoiceEvent(job, agreement, invoice, *, exc_info=None)
- class yapapi.events.DebitNoteEvent(job, agreement, debit_note, *, exc_info=None)
- class yapapi.events.JobStarted(job, *, exc_info=None)
- class yapapi.events.JobFinished(job, *, exc_info=None)
job
is done, succeded ifexception
is None failed otherwise.
- class yapapi.events.SubscriptionCreated(job, subscription, *, exc_info=None)
- class yapapi.events.SubscriptionFailed(job, reason, *, exc_info=None)
- class yapapi.events.CollectFailed(job, subscription, reason, *, exc_info=None)
- class yapapi.events.ProposalReceived(job, proposal, *, exc_info=None)
- class yapapi.events.ProposalRejected(job, proposal, reason=None, *, exc_info=None)
We decided to reject provider’s proposal because of a
reason
.
- class yapapi.events.ProposalResponded(job, proposal, *, exc_info=None)
- class yapapi.events.ProposalConfirmed(job, proposal, *, exc_info=None)
- class yapapi.events.ProposalFailed(job, proposal, *, exc_info=None)
- class yapapi.events.NoProposalsConfirmed(job, timeout, *, exc_info=None)
We didn’t confirm any proposal for a period of
timeout
.
- class yapapi.events.AgreementCreated(job, agreement, *, exc_info=None)
- class yapapi.events.AgreementConfirmed(job, agreement, *, exc_info=None)
- class yapapi.events.AgreementRejected(job, agreement, *, exc_info=None)
- class yapapi.events.AgreementTerminated(job, agreement, reason, *, exc_info=None)
- class yapapi.events.DebitNoteReceived(job, agreement, debit_note, *, exc_info=None)
- class yapapi.events.DebitNoteAccepted(job, agreement, debit_note, *, exc_info=None)
- class yapapi.events.PaymentFailed(job, agreement, *, exc_info=None)
- class yapapi.events.InvoiceReceived(job, agreement, invoice, *, exc_info=None)
- class yapapi.events.InvoiceAccepted(job, agreement, invoice, *, exc_info=None)
- class yapapi.events.WorkerStarted(job, agreement, *, exc_info=None)
- class yapapi.events.ActivityCreated(job, agreement, activity, *, exc_info=None)
- class yapapi.events.ActivityCreateFailed(job, agreement, *, exc_info=None)
- class yapapi.events.TaskStarted(job, agreement, activity, task, *, exc_info=None)
- class yapapi.events.TaskFinished(job, agreement, activity, task, *, exc_info=None)
- class yapapi.events.ServiceStateChanged(job, agreement, activity, service, old, new, *, exc_info=None)
- class yapapi.events.ServiceFinished(job, agreement, activity, service, *, exc_info=None)
- class yapapi.events.WorkerFinished(job, agreement, activity, *, exc_info=None)
- class yapapi.events.ScriptSent(job, agreement, activity, script, *, exc_info=None)
- class yapapi.events.GettingResults(job, agreement, activity, script, *, exc_info=None)
- class yapapi.events.ScriptFinished(job, agreement, activity, script, *, exc_info=None)
- class yapapi.events.CommandExecuted(job, agreement, activity, script, command, success, message, stdout=None, stderr=None, *, exc_info=None)
- class yapapi.events.CommandStarted(job, agreement, activity, script, command, *, exc_info=None)
- class yapapi.events.CommandStdOut(job, agreement, activity, script, command, output, *, exc_info=None)
- class yapapi.events.CommandStdErr(job, agreement, activity, script, command, output, *, exc_info=None)
- class yapapi.events.TaskAccepted(job, agreement, activity, task, *, exc_info=None)
- class yapapi.events.TaskRejected(job, agreement, activity, task, reason, *, exc_info=None)
- class yapapi.events.DownloadStarted(job, agreement, activity, script, command, *, exc_info=None)
- class yapapi.events.DownloadFinished(job, agreement, activity, script, command, *, exc_info=None)
- class yapapi.events.ShutdownFinished(*, exc_info=None)
Golem completed the shutdown sequence and is no longer operative.
- class yapapi.events.ExecutionInterrupted(*, exc_info=None)
Golem was stopped by an unhandled exception in code not managed by yapapi.
Exceptions
- exception yapapi.NoPaymentAccountError(required_driver, required_network)
The error raised if no payment account for the required driver/network is available.
- exception yapapi.rest.activity.BatchTimeoutError
An exception that indicates that an execution of a batch of commands timed out.
Logging
- yapapi.log.enable_default_logger(format_='[%(asctime)s %(levelname)s %(name)s] %(message)s', log_file=None, debug_activity_api=False, debug_market_api=False, debug_payment_api=False, debug_net_api=False)
Enable the default logger that logs messages to stderr with level INFO.
If log_file is specified, the logger with output messages with level DEBUG to the given file.
- class yapapi.log.SummaryLogger(wrapped_emitter=None)
Aggregates information from computation events to provide a high-level summary.
The logger’s
log()
method can be used as event_consumer callback inGolem
initialization. It will aggregate the events generated and output some summary information.The optional wrapped_emitter argument can be used for chaining event emitters: each event logged with
log()
is first passed to wrapped_emitter.For example, with the following setup, each event will be logged by log_event_repr, and additionally, certain events will cause summary messages to be logged.
detailed_logger = log_event_repr summary_logger = SummaryLogger(wrapped_emitter=detailed_logger).log golem = Golem(..., event_consumer=summary_logger)
- log(event)
Register an event.
- Return type
None
- yapapi.log.log_summary(wrapped_emitter=None)
Output a summary of computation.
This is a utility function that creates a
SummaryLogger
instance wrapping an optional wrapped_emitter and returns itslog()
method.See the documentation of
SummaryLogger
for more information.
Utils
- yapapi.windows_event_loop_fix()
Set up asyncio to use ProactorEventLoop implementation for new event loops on Windows.
This work-around is only needed for Python 3.6 and 3.7. With Python 3.8, ProactorEventLoop is already the default on Windows.
- yapapi.get_version()
Return the version of the yapapi library package.
- Return type
str
Yapapi Contrib
Useful, reusable pieces of code built on top of yapapi.
This part of yapapi repository serves to provide requestor agent application authors with reusable components that we or third-party developers found useful while working on examples and apps that use our Python API.
They’re not strictly part of the high level API library itself and while we do intend to keep them in sync with the subsequent releases, they should be treated as experimental and their interface may change without a proper deprecation process.
Local HTTP Proxy.
A local HTTP proxy that enables easy connections to any VPN-enabled, HTTP-based services launched on Golem providers using yapapi’s Services API.
For usage in a complete requestor agent app, see the http-proxy and webapp examples in the yapapi repository.
- class yapapi.contrib.service.http_proxy.LocalHttpProxy(cluster, port, max_request_size=134217728)
Runs a local aiohttp server and processes requests through instances of
HttpProxyService
.Using yapapi’s Network API (
create_network()
), execution units on the provider nodes can be connected to a virtual network which can then be used both for communication between those nodes (through virtual network interfaces within VMs) and between the specific nodes and the requestor agent (through a websocket endpoint in the yagna daemon’s REST API).LocalHttpProxy and HttpProxyService use the latter to enable HTTP connections to be routed from a local port on the requestor’s host, to a specified TCP port within the VM on the provider’s end.
Example usage:
class HttpService(HttpProxyService): ... cluster = await golem.run_service( HttpService, network=network, instance_params=[{"remote_port": 80}], # optional, 80 by default ) proxy = LocalHttpProxy(cluster, 8080) await proxy.run() ... # requests made to http://localhost:8080 are routed to port 80 within the VM await proxy.stop() cluster.stop()
- __init__(cluster, port, max_request_size=134217728)
Initialize the local HTTP proxy.
- Parameters
cluster (
Cluster
[HttpProxyService
]) – aCluster
of one or more VPN-connectedHttpProxyService
instances.port (
int
) – a local port on the requestor’s machine to listen onmax_request_size (
int
) – maximum client request size, defaults to 128MB
- async run()
Run a local HTTP server.
Will be listening on the specified port and passing subsequent requests to the
handle_request()
of the specified cluster in a round-robin fashion.
- class yapapi.contrib.service.http_proxy.HttpProxyService(remote_port=80, remote_host=None, response_timeout=30.0)
Base class for services connected to the
LocalHttpProxy
.Implements the interface used by LocalHttpProxy to route HTTP requests to the instances of an HTTP service running on providers.
- __init__(remote_port=80, remote_host=None, response_timeout=30.0)
Initialize the HTTP proxy service.
- Parameters
remote_port (
int
) – the port on which the service on the provider’s end listensremote_host (
Optional
[str
]) – optional hostname to be used in the headers passed to the remote HTTP server. If not provided, the Host header of the incoming http requests will be passed.response_timeout (
float
) – the timeout for the requests made to the remote server
- async handle_request(request)
Handle a single request coming from a
LocalHttpProxy
server by passing it to the HTTP service on the provider’s end through the VPN.- Parameters
request (
Request
) – an aiohttp.web.Request- Return type
Response
- Returns
an aiohttp.web.Response
TCP socket proxy.
A local proxy that facilitates connections to any VPN-enabled TCP services launched on Golem providers using yapapi’s Services API.
For usage in a complete requestor agent app, see the ssh example in the yapapi repository.
- class yapapi.contrib.service.socket_proxy.SocketProxy(ports, address='127.0.0.1', buffer_size=1048576, timeout=300.0)
Exposes ports of services running in VMs on providers as local ports.
The connections can be routed to instances of services connected to a Golem VPN using yapapi’s Network API (
create_network()
).Example usage:
class SomeService(SocketProxyService): def __init__(self, remote_port: int = 4242): super().init() self.remote_ports = [remote_port] ... cluster = await golem.run_service( SomeService, network=network, ) # ensure services are started ... proxy = SocketProxy(ports=[8484]) await proxy.run(cluster) ... # connections to local port 8484 will be routed to port 4242 within the VM await proxy.stop() cluster.stop()
Example usage directly from a Service handler:
class SomeOtherService(SocketProxyService): remote_port = 22 # e.g. an SSH daemon def __init__(self, proxy: SocketProxy): super().__init__() self.proxy = proxy async def start(self): # perform the initialization of the Service ... server = await self.proxy.run_server(self, self.remote_port) proxy = SocketProxy(ports=[2222]) cluster = await golem.run_service( SomeOtherService, network=network, instance_params=[{"proxy": proxy}], ) ... # connections to local port 2222 will be routed to port 22 within the VM await proxy.stop() cluster.stop()
- __init__(ports, address='127.0.0.1', buffer_size=1048576, timeout=300.0)
Initialize the TCP socket proxy service.
- Parameters
ports (
List
[int
]) – a list of local ports that will be assigned to consecutive connectionsaddress (
str
) – the IP address to bind the local server tobuffer_size (
int
) – the size of the data buffer used for the connectionstimeout (
float
) – the timeout in seconds for the response of the remote end
- async run_server(service, remote_port)
Run a socket proxy for a single port on a instance of a service.
- Parameters
service (
Service
) – the service instanceremote_port (
int
) – the remote port on which a TCP service is listening on the remote end
- async run(cluster)
Run the proxy servers for all ports on a cluster.
- Parameters
cluster (
Cluster
[SocketProxyService
]) – the cluster for which the proxy connections should be enabled.
- async stop()
Stop servers for all connections.
- class yapapi.contrib.service.socket_proxy.SocketProxyService(_id=None)
Base class for services connected to the
SocketProxy
.Implements the interface required by the SocketProxy.
Provider Filter.
Market strategy wrapper that enables easy exclusion of offers from certain providers using a simple boolean condition, while preserving correct scoring of the remaining offers by the base strategy.
- class yapapi.contrib.strategy.ProviderFilter(base_strategy, is_allowed)
ProviderFilter - extend a market strategy with a layer that excludes offers from certain issuers.
- Parameters
base_strategy (
BaseMarketStrategy
) – a market strategy that will be used to score offers from allowed providersis_allowed (
Union
[Callable
[[str
],bool
],Callable
[[str
],Awaitable
[bool
]]]) – a callable that accepts provider_id as an argument and returns either a boolean, or a boolean-returning awaitable, determining if offers from this provider should be considered (that is: scored by the base_strategy)
Example 1. Block selected providers:
bad_providers = ['bad_provider_1', 'bad_provider_2'] base_strategy = SomeStrategy(...) strategy = ProviderFilter(base_strategy, lambda provider_id: provider_id not in bad_providers)
Example 2. Select providers using a database table:
# create an async database connection # (sync would also work, but could hurt `yapapi` overall performance) async_conn = ... async def is_allowed(provider_id): result = await async_conn.execute("SELECT 1 FROM allowed_providers WHERE provider_id = ?", provider_id) return bool(result.fetchall()) base_strategy = SomeStrategy() strategy = ProviderFilter(base_strategy, is_allowed)
Example 3. Use the default strategy, but disable every provider that fails to create an activity:
from yapapi import events bad_providers = set() def denying_event_consumer(event: events.Event): if isinstance(event, events.ActivityCreateFailed): bad_providers.add(event.provider_id) golem = Golem(...) golem.strategy = ProviderFilter(golem.strategy, lambda provider_id: provider_id not in bad_providers) await golem.add_event_consumer(denying_event_consumer) async with golem: ... # NOTE: this will currently work only for **new** offers from the provider, because old offers are already # scored, this should improve in https://github.com/golemfactory/yapapi/issues/820