API documentation

tos

Python implementation module.

tos.task_object_storage.TaskObjectStorage

class tos.task_object_storage.TaskObjectStorage(**options)[source]
__init__(**options)[source]
Parameters:options – A dictionary of MongoDB options for the database server URL and port, and the process database name, username and password.

The following is a list of accepted options as keyword arguments:

Parameters:
  • db_server (str) – Mongodb server uri and optional port, e.g. ‘localhost:27017’
  • db_name (str) – Database name.
  • db_user (str) – Database username.
  • db_passw (str) – Database password.
  • db_auth_source (str) – Authentication database.
  • mongo_client_options (dict) – Extra options to be passed to mongo client

Example usage:

tos = TaskObjectStorage(
        db_server="localhost:27017",
        db_name="testing",
        db_auth_source="admin",
        db_user="robo-user",
        db_passw="secret-word",
)
tos.initialize_tos()

where db_auth_source is the same as db_name by default.

add_binary_data_to_payload(object_id, filepath, data_title)

Put an encoded binary string to database.

Parameters:
  • object_id (str or ObjectId) – Object id
  • filepath (str) – file path to the file to encode
  • data_title (str) – name of the binary data to store.
Returns:

Updated task object

Return type:

dict

Variables:

bin_data – bson.Binary encoded binary data

add_key_value_pair_to_payload(object_id, key, value)

Add single key-value pair to payload.

Parameters:
  • object_id (str or ObjectId) – Object id
  • key (str) – name of the field to insert
  • value (str) – value of the field to insert
Returns:

Updated task object, if separated payload is updated & merged to task object

Return type:

dict

connect()[source]

Connect to MongoDB.

Returns:MongoDB client object.
Return type:pymongo.MongoClient
create_new_task_object(payload, process_name='', priority=0)

Create a new task object and insert into database.

With separate payload collection: Returns merged to task object, identical as with regular usage. Use find_task_object_by_id to get the actual, bare task object if it is required.

Parameters:
  • payload (dict) – The contents of the actual task object
  • priority (int) – Priority of the task object, default=0.
  • process_name (str) – Name of the process.
Returns:

The created task object.

Return type:

dict

Usage:

>>> payload = {"customer_number": 583459089}
>>> create_new_task_object(payload)
{
    "_id": ObjectId("5c519c08cd9c9f140f95b427"),
    "status": "new",
    "stage": 0,
    "priority": 0,
    "executor": ""
    "payload": {
        "customer_number": 583459089
    }
    "analytics": {},
}

With separated payloads collection returns the same value. However the true created task object is:

{
    "_id": ObjectId("5c519c08cd9c9f140f95b427"),
    ...
    "payload": {
        "_id": ObjectId("60db0d52d30efa2804f80a8c")
    }
    ...
}
decrement_task_object_stage(object_id)
Parameters:object_id (str or ObjectId) – Object id
Returns:Updated task object
Return type:dict
disconnect()[source]
find_all_failed_task_objects()

Convenience method.

find_all_task_objects_and_merge_them_with_payloads(object_ids: list)

Finds all given task objects, and merges them with their referenced payloads.

Note: If task object contains reference to expired payload, this method will not return the task object!

Parameters:object_ids – Task objects’ Object ID’s
Raises:TypeError – If separated payloads are not initialised
Returns:Cursor to iterate over query results
Return type:pymongo.cursor
find_all_task_objects_by_stage(stages)

Convenience method.

find_all_task_objects_by_status(statuses)

Convenience method.

find_all_task_objects_by_status_and_stage(statuses=None, stages=None)

Get all task objects by status and stage.

The returned list is sorted by priority in descending order so the hightest priority item is always the first.

Filters out task-objects which have defer_until-field set in the future.

With separated payload collection: Returns task objects with BARE payload (just reference to separate payload) Use find_all_task_objects_and_merge_them_with_payloads to get task objects with merged payloads.

Parameters:
  • statuses (str or list of strs) – status(es)
  • stages (int or list of ints) – stage number(s)
Raises:

TypeError – when invalid keyword arguments passed

Returns:

list of task objects

Return type:

list

Usage:

>>> find_all_task_objects_by_status_and_stage("pass", 1)
find_all_task_objects_marked_as_manual_and_not_passed()

Get all task objects marked to manual handling.

Returns:list of task objects
Return type:list
find_one_task_object_by_stage(stages)

Convenience method.

find_one_task_object_by_status(statuses)

Convenience method.

find_one_task_object_by_status_and_stage(statuses=None, stages=None, **kwargs)

Get one of the task objects by status and (optionally) by stage.

The status of the task object is always set to processing when using this keyword.

The filtered results are sorted by date (by default) in ascending order so an item with the hightest priority will be returned.

Filters out task-objects which have defer_until-field set in the future.

Parameters:
  • statuses (str or list of strs) – status(es)
  • stages (int or list of ints) – stage number(s)
  • sort_condition – custom sort condition of the form [(“_id”, pymongo.ASCENDING)]
Raises:

TypeError – when invalid keyword arguments passed

Returns:

task object with the hightest priority.

Return type:

dict

find_one_task_object_marked_as_manual_and_not_passed()

Get one of the task objects marked as manual.

The list returned by find_all_task_objects_marked_as_manual_and_not_passed is sorted by priority so the hightest priority item is always the first.

Returns:manual task object with the hightest priority.
Return type:dict
find_payload_document_by_payload_id(payload_id)

Finds the given whole payload (with ID and timestamps) from the separate ‘payloads’-collection.

Parameters:

object_id (ObjectId or str) – The payload document object id

Returns:

task object

Raises:
  • TypeError – If separated payloads are not initialised
  • ValueError – When no payload document with given id is found
Return type:

dict

find_payload_document_by_to_id(object_id)

Finds the given whole (with ID and timestamps) payload from the separate ‘payloads’-collection.

Parameters:

object_id (ObjectId or str) – The payload document object id

Returns:

payload document

Raises:
  • TypeError – If separated payloads are not initialised
  • ValueError – When no payload document with given id is found
Return type:

dict

find_task_object_by_id(object_id)

Get a single task object.

This doesn’t change the status of the task object.

With separated payload collection: Returns task object with BARE payload (just reference to separate payload). Use find_task_object_by_id_and_merge_payload to get already merged payload like in a normal task object.

Parameters:object_id (ObjectId or str) – the object id
Returns:task object
Raises:ValueError – When no task object with given id is found
Return type:dict
find_task_object_by_id_and_merge_payload(object_id)

Finds given task object, then finds the referenced payload in separate collection, and merges it with the original task object. Resulting task object does not contain the “extra”-fields in the payload documents, such as _id or timestamps.

Parameters:

object_id (ObjectId or str) – The payload document object id

Raises:
  • TypeError – If separated payloads are not initialised
  • ValueError – When no task object or referenced payload with given id is found
Returns:

task object

Return type:

dict

increment_retry_count(object_id)
Parameters:object_id (str or ObjectId) – Object id
Returns:Updated task object
Return type:dict
increment_task_object_stage(object_id)
Parameters:object_id (str or ObjectId) – Object id
Returns:Updated task object
Return type:dict
initialize_tos(collection_suffix='', separate_payloads=False, payloads_ttl_seconds=0, collection_prefix='')[source]

Initialize Mongo database and collection objects.

If payloads_ttl_seconds is given a positive non-zero value, payloads in separate collection will have the given lifetime in seconds. The existence of the index will be enforced on every initialisation.

Parameters:
  • collection_suffix – Optional suffix to collection name
  • separate_payloads – Optionally separate payloads to separate collection
  • payloads_ttl_seconds – Optional lifetime for separate TTL payloads
  • collection_prefix – Optional prefix to collection name
Raises:
  • ValueError – If separate_payloads is not given but payloads_ttl_seconds is.
  • ValueError – if payloads-collection already exists but payloads_ttl_seconds is not given.
list_collections()[source]
push_value_to_array_field(object_id, value, field_name)

Push an arbitrary value to an array type field.

Push is an upsert operation; if the array doesn’t exist, it will be created.

With separate payloads collection: If used to push value to payload, acts on the separate payload in its own collection, and returns the merged task object.

Parameters:
  • object_id (str or ObjectId) – Object id
  • value (str, int or dict) – value of the field to insert
  • field_name (str) – Name of the array field, e.g. ‘analytics’ or ‘payload.alerts’
Returns:

Updated task object, if separated payload is updated & merged to task object

Return type:

dict

remove_field_from_payload(object_id, key)

Remove single key-value pair from payload. A nested key-value pair can be removed by referring to it with dot notation.

Parameters:
  • object_id (str or ObjectId) – Object id
  • key (str) – name of the field to remove
Returns:

Updated task object, if separated payload is updated & merged to task object

Return type:

dict

save_binary_payload_to_tmp(task_object, payload_item_name, prefix='', suffix='')
save_retry_metadata_if_retry(to, current_stage)

For every run, check if it’s a retry. If it is, populate a retry object inside the stage object. This way we get to know when and where the job has been running on failures.

set_retry_build_number(object_id, stage, retry_count)
set_retry_end_time(object_id, stage, retry_count)
set_retry_executor(object_id, stage, retry_count)
set_retry_start_time(object_id, stage, retry_count)
set_stage_build_number(object_id, stage)
set_stage_end_time(object_id, stage)

Sets end time, calculates stage’s duration based on start and end times, and sets duration as well.

Parameters:
  • object_id (str or ObjectId) – Object id
  • stage (int) – target stage
Returns:

Updated task object

Return type:

dict

set_stage_executor(object_id, stage)
set_stage_start_time(object_id, stage)
Parameters:
  • object_id (str or ObjectId) – Object id
  • stage (int) – target stage
Returns:

Updated task object

Return type:

dict

set_stage_status(object_id, stage, status)
Parameters:
  • object_id (str or ObjectId) – Object id
  • stage (int) – target stage
Returns:

Updated task object

Return type:

dict

set_task_object_analytics(object_id, new_analytics)

Replace the current analytics object with an updated one.

Parameters:
  • object_id (str or ObjectId) – Object id
  • new_analytics (dict) – new analytics object
Returns:

Updated task object

Return type:

dict

set_task_object_analytics_item(object_id, key, value)

Update one item in the analytics dict.

Parameters:
  • object_id (str or ObjectId) – Object id
  • key (str) – analytics item key
  • value (str) – analytics item value
Returns:

Updated task object

Return type:

dict

set_task_object_build_number(object_id)
set_task_object_executor(object_id)
set_task_object_for_rerun(object_id, defer_until=None)

Decrement stage and set status to pass for rerunning the previous stage. Optionally also set the earliest time when object can be processed again.

Parameters:
  • object_id (str or ObjectId) – Object id
  • defer_until (Datetime object, optional) – earliest time of next allowed run, defaults to None
Raises:

ValueError – if current task object stage is 0 or less

Returns:

Updated task object

Return type:

dict

set_task_object_job_name(object_id)
set_task_object_last_error(object_id, error_text)
Parameters:
  • object_id (str or ObjectId) – Object id
  • error_text (str) – Exception stacktrace or error text
Returns:

Updated task object

Return type:

dict

set_task_object_payload(object_id, new_payload)

Replace the current payload object with an updated one.

Parameters:
  • object_id (str or ObjectId) – Object id
  • new_payload (dict) – new payload object
Returns:

Updated task object, if separated payload is updated & merged to task object

Return type:

dict

set_task_object_priority(object_id, new_priority)

Set the priority of a task object.

Parameters:
  • object_id (str or ObjectId) – Object id
  • new_priority (int) – New desired priority
Returns:

Updated task object

Return type:

dict

set_task_object_retry_time(object_id, defer_until)

Sets the earliest time that given object’s processing can be retried.

Parameters:
  • object_id (str or ObjectId) – Object id
  • defer_until (Datetime object) – earliest time of next allowed run
Raises:

ValueError – if retry time is not datetime object

Returns:

Updated task object

Return type:

dict

set_task_object_stage(object_id, stage: int)
Parameters:
  • object_id (str or ObjectId) – Object id
  • stage (int) – target stage
Returns:

Updated task object

Return type:

dict

set_task_object_status(object_id, status)

Set the task object status to one of the accepted values.

Accepted values are in settings.VALID_STATUSES

Parameters:
  • object_id (str or ObjectId) – Object id.
  • status (str) – status text
Returns:

Updated task object

Return type:

dict

set_task_object_to_manual_handling(object_id)
update_exceptions(object_id, current_stage, error_text)

Update task object and its stages object with the new exception.

update_payload(object_id, update)

Update task object payload with new data. Note that only updates top level of payload - nested documents will be overwritten!

Parameters:
  • object_id (str or ObjectId) – Object id
  • update (dict) – key-value pair(s) to add to payload
Raises:

TypeError – when update is not a dict.

Returns:

Updated task object, if separated payload is updated & merged to task object

Return type:

dict

update_payload_document_and_timestamps(object_id, update: dict)

Updates the separate task object payload document in its own collection with given update-operation, as well as updates its modification-timestamp. Also updates the modification-timestamp in the main task object.

Expects the task object ‘payload’-field to solely contain reference to the payload in its own collection. E.g.

>>> update = {"$set": {"payload": {"this_is": "example!"}}
>>> update_payload_document_and_timestamps(to['_id'], update)
{   ...,
    "payload": {
        "_id": ObjectId("...")
    }
    "updatedAt": Date(*now*)
    ...
}
>>> find_payload_document_by_payload_id(to['payload']['_id'])
{
    "_id": ObjectId("...")},
    "payload": {
        "this_is": "example!"
    },
    updatedAt: Date(*now*),
    createdAt: ...
}

Only use this method if there is a need for custom manipulation of payload document fields. Otherwise use standard update_payload etc.

Parameters:
  • object_id (str or ObjectId) – Task object id
  • update (dict) – Update operation to apply
Raises:

ValueError – if payload document update fails

Returns:

Updated, bare task object

Return type:

dict

update_stage_exceptions(object_id, error_text, stage)
Parameters:
  • object_id (str or ObjectId) – Object id
  • error_text (str) – Exception stacktrace or error text
Returns:

Updated task object

Return type:

dict

update_stage_object(object_id, current_stage)

Update stages object with new stage.

update_stage_retry_count(object_id, stage)
Parameters:
  • object_id (str or ObjectId) – Object id
  • stage (int) – target stage
Returns:

Updated task object

Return type:

dict

update_status(object_id, current_stage, status)

Update task object and its stages object with new status.

update_task_object_and_timestamp(filter: dict, update: dict, sort=None)

Update task object and the modification time. Do not call this method directly.

Parameters:
  • filter (dict) – MongoDB query
  • update (dict) – Update operation to apply
  • sort (list) – Order for the query if multiple matches
Returns:

Updated task object

Return type:

dict

tos.settings

Settings and variables for TOS.

tos.settings.DEFAULT_DB_ADDRESS = 'mongodb://localhost:27017/'

tos.statuses

Items are given “new” status when the producer creates them. When the consumers check out the item, its status will be set as “processing”. When the consumer finishes, the status will be set “pass” or “fail” depending on the outcome. when case for manual intervention is detected, the status will be set to “expected_fail”.

class tos.statuses.Status[source]

tos.utils

Utility functions for TOS.

tos.utils.accept_string_object_ids(func)[source]

Convert first function argument to ObjectId.

Decorator for converting possible str type object ids to ObjectId type. Assume the object id is the first argument passed to func.

tos.utils.get_temporary_file(prefix='tmp_', suffix='', directory=None)[source]

Generate a safe and closed filepath.

TOSLibrary

Robot Framework keyword layer module.

TOSLibrary.TOSLibrary.TOSLibrary

class TOSLibrary.TOSLibrary.TOSLibrary(db_server, db_name, db_user='', db_passw='', db_auth_source='', collection_suffix='', separate_payloads=False, payloads_ttl=0, collection_prefix='', mongo_client_options={})[source]

Robot Framework layer for TOS.

__init__(db_server, db_name, db_user='', db_passw='', db_auth_source='', collection_suffix='', separate_payloads=False, payloads_ttl=0, collection_prefix='', mongo_client_options={})[source]

Initialize the MongoDB client and collection.

Register the methods inside tos.TaskObjectStorage as TOSLibrary keywords.

Parameters:
  • db_server (str) – Mongodb server uri and port, e.g. ‘localhost:27017’
  • db_name (str) – Database name.
  • db_user (str) – Database username.
  • db_passw (str) – Database password.
  • db_auth_source (str) – Authentication database.
  • collection_suffix (str) – Suffix for collection. (task_objects.suffix)
  • collection_prefix (str) – Prefix for collection. (prefix.task_objects, prefix.payloads)
  • separate_payloads (bool) – Optionally separate payloads to separate collection
  • payloads_ttl (Union[int, str]) – Optional lifetime for separate payloads. Either seconds or timestring (for example ‘30 days’ or ‘1h 10s’)
  • mongo_client_options (dict) – Extra options to be passed to mongo client

TOSLibrary.dynamic_library.DynamicLibrary

class TOSLibrary.dynamic_library.DynamicLibrary[source]

Dynamic library core inspired by SeleniumLibrary for Robot Framework.

You can create dynamic libraries by inheriting this class.

Example usage:

from tos.task_object_storage import TaskObjectStorage
from module import another_module

class TOSLibrary(DynamicLibrary):

    def __init__(self):
        super(TOSLibrary, self).__init__()
        tos = TaskObjectStorage()

        self.add_component(self)
        self.add_component(tos)
        self.add_component(another_module)
add_component(component)[source]

Add external (class) methods to the library.

This registers an arbitrary collection of functions as keywords callable from Robot Framework.

Parameters:component – class, instance, or module that contains functions or methods (callables).
get_keyword_arguments(name)[source]
get_keyword_documentation(name)[source]
get_keyword_names()[source]

Robot Framework dynamic API keyword collector.

Returns:all keywords registered for the library.
Return type:list
run_keyword(name, args, kwargs)[source]

Call the keyword method by name.

Parameters:
  • name – Name of the keyword.
  • args – args passed from Robot Framework (can be empty).
  • kwargs – kwargs passed from Robot Framework (can be empty).
Returns:

call to a registered keyword by name.