API documentation¶
tos¶
Python implementation module.
tos.task_object_storage.TaskObjectStorage¶
-
class
tos.task_object_storage.
TaskObjectStorage
(**options)[source]¶ -
__init__
(**options)[source]¶ Parameters: options – A dictionary of MongoDB options for the database server URL and port, and the process database name, username and password. The following is a list of accepted options as keyword arguments:
Parameters: - db_server (str) – Mongodb server uri and optional port, e.g. ‘localhost:27017’
- db_name (str) – Database name.
- db_user (str) – Database username.
- db_passw (str) – Database password.
- db_auth_source (str) – Authentication database.
- mongo_client_options (dict) – Extra options to be passed to mongo client
Example usage:
tos = TaskObjectStorage( db_server="localhost:27017", db_name="testing", db_auth_source="admin", db_user="robo-user", db_passw="secret-word", ) tos.initialize_tos()
where
db_auth_source
is the same asdb_name
by default.
-
add_binary_data_to_payload
(object_id, filepath, data_title)¶ Put an encoded binary string to database.
Parameters: - object_id (str or ObjectId) – Object id
- filepath (str) – file path to the file to encode
- data_title (str) – name of the binary data to store.
Returns: Updated task object
Return type: dict
Variables: bin_data – bson.Binary encoded binary data
-
add_key_value_pair_to_payload
(object_id, key, value)¶ Add single key-value pair to payload.
Parameters: - object_id (str or ObjectId) – Object id
- key (str) – name of the field to insert
- value (str) – value of the field to insert
Returns: Updated task object, if separated payload is updated & merged to task object
Return type: dict
-
connect
()[source]¶ Connect to MongoDB.
Returns: MongoDB client object. Return type: pymongo.MongoClient
-
create_new_task_object
(payload, process_name='', priority=0)¶ Create a new task object and insert into database.
With separate payload collection: Returns merged to task object, identical as with regular usage. Use find_task_object_by_id to get the actual, bare task object if it is required.
Parameters: - payload (dict) – The contents of the actual task object
- priority (int) – Priority of the task object, default=0.
- process_name (str) – Name of the process.
Returns: The created task object.
Return type: dict
Usage:
>>> payload = {"customer_number": 583459089} >>> create_new_task_object(payload) { "_id": ObjectId("5c519c08cd9c9f140f95b427"), "status": "new", "stage": 0, "priority": 0, "executor": "" "payload": { "customer_number": 583459089 } "analytics": {}, }
With separated payloads collection returns the same value. However the true created task object is:
{ "_id": ObjectId("5c519c08cd9c9f140f95b427"), ... "payload": { "_id": ObjectId("60db0d52d30efa2804f80a8c") } ... }
-
decrement_task_object_stage
(object_id)¶ Parameters: object_id (str or ObjectId) – Object id Returns: Updated task object Return type: dict
-
find_all_failed_task_objects
()¶ Convenience method.
-
find_all_task_objects_and_merge_them_with_payloads
(object_ids: list)¶ Finds all given task objects, and merges them with their referenced payloads.
Note: If task object contains reference to expired payload, this method will not return the task object!
Parameters: object_ids – Task objects’ Object ID’s Raises: TypeError – If separated payloads are not initialised Returns: Cursor to iterate over query results Return type: pymongo.cursor
-
find_all_task_objects_by_stage
(stages)¶ Convenience method.
-
find_all_task_objects_by_status
(statuses)¶ Convenience method.
-
find_all_task_objects_by_status_and_stage
(statuses=None, stages=None)¶ Get all task objects by status and stage.
The returned list is sorted by priority in descending order so the hightest priority item is always the first.
Filters out task-objects which have defer_until-field set in the future.
With separated payload collection: Returns task objects with BARE payload (just reference to separate payload) Use
find_all_task_objects_and_merge_them_with_payloads
to get task objects with merged payloads.Parameters: - statuses (str or list of strs) – status(es)
- stages (int or list of ints) – stage number(s)
Raises: TypeError – when invalid keyword arguments passed
Returns: list of task objects
Return type: list
Usage:
>>> find_all_task_objects_by_status_and_stage("pass", 1)
-
find_all_task_objects_marked_as_manual_and_not_passed
()¶ Get all task objects marked to manual handling.
Returns: list of task objects Return type: list
-
find_one_task_object_by_stage
(stages)¶ Convenience method.
-
find_one_task_object_by_status
(statuses)¶ Convenience method.
-
find_one_task_object_by_status_and_stage
(statuses=None, stages=None, **kwargs)¶ Get one of the task objects by status and (optionally) by stage.
The status of the task object is always set to processing when using this keyword.
The filtered results are sorted by date (by default) in ascending order so an item with the hightest priority will be returned.
Filters out task-objects which have defer_until-field set in the future.
Parameters: - statuses (str or list of strs) – status(es)
- stages (int or list of ints) – stage number(s)
- sort_condition – custom sort condition of the form [(“_id”, pymongo.ASCENDING)]
Raises: TypeError – when invalid keyword arguments passed
Returns: task object with the hightest priority.
Return type: dict
-
find_one_task_object_marked_as_manual_and_not_passed
()¶ Get one of the task objects marked as manual.
The list returned by find_all_task_objects_marked_as_manual_and_not_passed is sorted by priority so the hightest priority item is always the first.
Returns: manual task object with the hightest priority. Return type: dict
-
find_payload_document_by_payload_id
(payload_id)¶ Finds the given whole payload (with ID and timestamps) from the separate ‘payloads’-collection.
Parameters: object_id (ObjectId or str) – The payload document object id
Returns: task object
Raises: - TypeError – If separated payloads are not initialised
- ValueError – When no payload document with given id is found
Return type: dict
-
find_payload_document_by_to_id
(object_id)¶ Finds the given whole (with ID and timestamps) payload from the separate ‘payloads’-collection.
Parameters: object_id (ObjectId or str) – The payload document object id
Returns: payload document
Raises: - TypeError – If separated payloads are not initialised
- ValueError – When no payload document with given id is found
Return type: dict
-
find_task_object_by_id
(object_id)¶ Get a single task object.
This doesn’t change the status of the task object.
With separated payload collection: Returns task object with BARE payload (just reference to separate payload). Use find_task_object_by_id_and_merge_payload to get already merged payload like in a normal task object.
Parameters: object_id (ObjectId or str) – the object id Returns: task object Raises: ValueError – When no task object with given id is found Return type: dict
-
find_task_object_by_id_and_merge_payload
(object_id)¶ Finds given task object, then finds the referenced payload in separate collection, and merges it with the original task object. Resulting task object does not contain the “extra”-fields in the payload documents, such as _id or timestamps.
Parameters: object_id (ObjectId or str) – The payload document object id
Raises: - TypeError – If separated payloads are not initialised
- ValueError – When no task object or referenced payload with given id is found
Returns: task object
Return type: dict
-
increment_retry_count
(object_id)¶ Parameters: object_id (str or ObjectId) – Object id Returns: Updated task object Return type: dict
-
increment_task_object_stage
(object_id)¶ Parameters: object_id (str or ObjectId) – Object id Returns: Updated task object Return type: dict
-
initialize_tos
(collection_suffix='', separate_payloads=False, payloads_ttl_seconds=0, collection_prefix='')[source]¶ Initialize Mongo database and collection objects.
If
payloads_ttl_seconds
is given a positive non-zero value, payloads in separate collection will have the given lifetime in seconds. The existence of the index will be enforced on every initialisation.Parameters: - collection_suffix – Optional suffix to collection name
- separate_payloads – Optionally separate payloads to separate collection
- payloads_ttl_seconds – Optional lifetime for separate TTL payloads
- collection_prefix – Optional prefix to collection name
Raises: - ValueError – If separate_payloads is not given but payloads_ttl_seconds is.
- ValueError – if payloads-collection already exists but
payloads_ttl_seconds
is not given.
-
push_value_to_array_field
(object_id, value, field_name)¶ Push an arbitrary value to an array type field.
Push is an upsert operation; if the array doesn’t exist, it will be created.
With separate payloads collection: If used to push value to payload, acts on the separate payload in its own collection, and returns the merged task object.
Parameters: - object_id (str or ObjectId) – Object id
- value (str, int or dict) – value of the field to insert
- field_name (str) – Name of the array field, e.g. ‘analytics’ or ‘payload.alerts’
Returns: Updated task object, if separated payload is updated & merged to task object
Return type: dict
-
remove_field_from_payload
(object_id, key)¶ Remove single key-value pair from payload. A nested key-value pair can be removed by referring to it with dot notation.
Parameters: - object_id (str or ObjectId) – Object id
- key (str) – name of the field to remove
Returns: Updated task object, if separated payload is updated & merged to task object
Return type: dict
-
save_binary_payload_to_tmp
(task_object, payload_item_name, prefix='', suffix='')¶
-
save_retry_metadata_if_retry
(to, current_stage)¶ For every run, check if it’s a retry. If it is, populate a retry object inside the stage object. This way we get to know when and where the job has been running on failures.
-
set_retry_build_number
(object_id, stage, retry_count)¶
-
set_retry_end_time
(object_id, stage, retry_count)¶
-
set_retry_executor
(object_id, stage, retry_count)¶
-
set_retry_start_time
(object_id, stage, retry_count)¶
-
set_stage_build_number
(object_id, stage)¶
-
set_stage_end_time
(object_id, stage)¶ Sets end time, calculates stage’s duration based on start and end times, and sets duration as well.
Parameters: - object_id (str or ObjectId) – Object id
- stage (int) – target stage
Returns: Updated task object
Return type: dict
-
set_stage_executor
(object_id, stage)¶
-
set_stage_start_time
(object_id, stage)¶ Parameters: - object_id (str or ObjectId) – Object id
- stage (int) – target stage
Returns: Updated task object
Return type: dict
-
set_stage_status
(object_id, stage, status)¶ Parameters: - object_id (str or ObjectId) – Object id
- stage (int) – target stage
Returns: Updated task object
Return type: dict
-
set_task_object_analytics
(object_id, new_analytics)¶ Replace the current analytics object with an updated one.
Parameters: - object_id (str or ObjectId) – Object id
- new_analytics (dict) – new analytics object
Returns: Updated task object
Return type: dict
-
set_task_object_analytics_item
(object_id, key, value)¶ Update one item in the analytics dict.
Parameters: - object_id (str or ObjectId) – Object id
- key (str) – analytics item key
- value (str) – analytics item value
Returns: Updated task object
Return type: dict
-
set_task_object_build_number
(object_id)¶
-
set_task_object_executor
(object_id)¶
-
set_task_object_for_rerun
(object_id, defer_until=None)¶ Decrement stage and set status to pass for rerunning the previous stage. Optionally also set the earliest time when object can be processed again.
Parameters: - object_id (str or ObjectId) – Object id
- defer_until (Datetime object, optional) – earliest time of next allowed run, defaults to None
Raises: ValueError – if current task object stage is 0 or less
Returns: Updated task object
Return type: dict
-
set_task_object_job_name
(object_id)¶
-
set_task_object_last_error
(object_id, error_text)¶ Parameters: - object_id (str or ObjectId) – Object id
- error_text (str) – Exception stacktrace or error text
Returns: Updated task object
Return type: dict
-
set_task_object_payload
(object_id, new_payload)¶ Replace the current payload object with an updated one.
Parameters: - object_id (str or ObjectId) – Object id
- new_payload (dict) – new payload object
Returns: Updated task object, if separated payload is updated & merged to task object
Return type: dict
-
set_task_object_priority
(object_id, new_priority)¶ Set the priority of a task object.
Parameters: - object_id (str or ObjectId) – Object id
- new_priority (int) – New desired priority
Returns: Updated task object
Return type: dict
-
set_task_object_retry_time
(object_id, defer_until)¶ Sets the earliest time that given object’s processing can be retried.
Parameters: - object_id (str or ObjectId) – Object id
- defer_until (Datetime object) – earliest time of next allowed run
Raises: ValueError – if retry time is not datetime object
Returns: Updated task object
Return type: dict
-
set_task_object_stage
(object_id, stage: int)¶ Parameters: - object_id (str or ObjectId) – Object id
- stage (int) – target stage
Returns: Updated task object
Return type: dict
-
set_task_object_status
(object_id, status)¶ Set the task object status to one of the accepted values.
Accepted values are in
settings.VALID_STATUSES
Parameters: - object_id (str or ObjectId) – Object id.
- status (str) – status text
Returns: Updated task object
Return type: dict
-
set_task_object_to_manual_handling
(object_id)¶
-
update_exceptions
(object_id, current_stage, error_text)¶ Update task object and its stages object with the new exception.
-
update_payload
(object_id, update)¶ Update task object payload with new data. Note that only updates top level of payload - nested documents will be overwritten!
Parameters: - object_id (str or ObjectId) – Object id
- update (dict) – key-value pair(s) to add to payload
Raises: TypeError – when
update
is not a dict.Returns: Updated task object, if separated payload is updated & merged to task object
Return type: dict
-
update_payload_document_and_timestamps
(object_id, update: dict)¶ Updates the separate task object payload document in its own collection with given
update
-operation, as well as updates its modification-timestamp. Also updates the modification-timestamp in the main task object.Expects the task object ‘payload’-field to solely contain reference to the payload in its own collection. E.g.
>>> update = {"$set": {"payload": {"this_is": "example!"}} >>> update_payload_document_and_timestamps(to['_id'], update) { ..., "payload": { "_id": ObjectId("...") } "updatedAt": Date(*now*) ... } >>> find_payload_document_by_payload_id(to['payload']['_id']) { "_id": ObjectId("...")}, "payload": { "this_is": "example!" }, updatedAt: Date(*now*), createdAt: ... }
Only use this method if there is a need for custom manipulation of payload document fields. Otherwise use standard
update_payload
etc.Parameters: - object_id (str or ObjectId) – Task object id
- update (dict) – Update operation to apply
Raises: ValueError – if payload document update fails
Returns: Updated, bare task object
Return type: dict
-
update_stage_exceptions
(object_id, error_text, stage)¶ Parameters: - object_id (str or ObjectId) – Object id
- error_text (str) – Exception stacktrace or error text
Returns: Updated task object
Return type: dict
-
update_stage_object
(object_id, current_stage)¶ Update stages object with new stage.
-
update_stage_retry_count
(object_id, stage)¶ Parameters: - object_id (str or ObjectId) – Object id
- stage (int) – target stage
Returns: Updated task object
Return type: dict
-
update_status
(object_id, current_stage, status)¶ Update task object and its stages object with new status.
-
update_task_object_and_timestamp
(filter: dict, update: dict, sort=None)¶ Update task object and the modification time. Do not call this method directly.
Parameters: - filter (dict) – MongoDB query
- update (dict) – Update operation to apply
- sort (list) – Order for the query if multiple matches
Returns: Updated task object
Return type: dict
-
tos.settings¶
Settings and variables for TOS.
-
tos.settings.
DEFAULT_DB_ADDRESS
= 'mongodb://localhost:27017/'¶
tos.statuses¶
Items are given “new” status when the producer creates them. When the consumers check out the item, its status will be set as “processing”. When the consumer finishes, the status will be set “pass” or “fail” depending on the outcome. when case for manual intervention is detected, the status will be set to “expected_fail”.
TOSLibrary¶
Robot Framework keyword layer module.
TOSLibrary.TOSLibrary.TOSLibrary¶
-
class
TOSLibrary.TOSLibrary.
TOSLibrary
(db_server, db_name, db_user='', db_passw='', db_auth_source='', collection_suffix='', separate_payloads=False, payloads_ttl=0, collection_prefix='', mongo_client_options={})[source]¶ Robot Framework layer for TOS.
-
__init__
(db_server, db_name, db_user='', db_passw='', db_auth_source='', collection_suffix='', separate_payloads=False, payloads_ttl=0, collection_prefix='', mongo_client_options={})[source]¶ Initialize the MongoDB client and collection.
Register the methods inside
tos.TaskObjectStorage
asTOSLibrary
keywords.Parameters: - db_server (str) – Mongodb server uri and port, e.g. ‘localhost:27017’
- db_name (str) – Database name.
- db_user (str) – Database username.
- db_passw (str) – Database password.
- db_auth_source (str) – Authentication database.
- collection_suffix (str) – Suffix for collection. (task_objects.suffix)
- collection_prefix (str) – Prefix for collection. (prefix.task_objects, prefix.payloads)
- separate_payloads (bool) – Optionally separate payloads to separate collection
- payloads_ttl (Union[int, str]) – Optional lifetime for separate payloads. Either seconds or timestring (for example ‘30 days’ or ‘1h 10s’)
- mongo_client_options (dict) – Extra options to be passed to mongo client
-
TOSLibrary.dynamic_library.DynamicLibrary¶
-
class
TOSLibrary.dynamic_library.
DynamicLibrary
[source]¶ Dynamic library core inspired by SeleniumLibrary for Robot Framework.
You can create dynamic libraries by inheriting this class.
Example usage:
from tos.task_object_storage import TaskObjectStorage from module import another_module class TOSLibrary(DynamicLibrary): def __init__(self): super(TOSLibrary, self).__init__() tos = TaskObjectStorage() self.add_component(self) self.add_component(tos) self.add_component(another_module)
-
add_component
(component)[source]¶ Add external (class) methods to the library.
This registers an arbitrary collection of functions as keywords callable from Robot Framework.
Parameters: component – class, instance, or module that contains functions or methods (callables).
-