API documentation

RPALibrary

RPALibrary.stages.BaseStage

class RPALibrary.stages.BaseStage.BaseStage[source]

RPA Library base class to be used for writing Robot Framework / Python stage definitions using TOSLibrary.

This class is inherited by Consumer and Producer. These classes contain error handling, logging, and TOS interaction, so users don’t have to write them manually every time they write new stages.

__init__()[source]

Remember to call this constructor when inheriting.

See the example in the RPALibrary class docstring above.

Variables:
  • self.tags – Tags of the current Robot Framework task.
  • self.tos – TOSLibrary instance of the current RF suite.
  • self.error_msg – Library-wide general error message text.
action_on_fail(to)[source]

Custom action to do when an error is encountered.

This is always called after automatic error handlers have done their job. You can define here some custom action or some steps that should be always run after every error handler.

E.g. fail the robot immediately with keyword “Fail”.

Note that these actions are not error handled, all exceptions will be propagated until Robot Framework stops execution with failure.

action_on_skip(to)[source]

Custom action to do when special skip exception is encountered.

Note that this functionality is identical to the action_on_fail except that this handler will be called instead when SkipProcessing exception is raised. Use this method to differentiate handling if needed.

post_action(to, status, *args, **kwargs)[source]

Teardown steps.

Action to do for every task object after the main action has completed succesfully or failed.

You should make the implementation yourself, if needed.

Parameters:
  • to (dict) – task object
  • status (str) – status returned from running handle_errors decorated main_action.

RPALibrary.stages.Producer

class RPALibrary.stages.Producer.Producer[source]
__init__()[source]

Remember to super call this constructor if your stage as its own __init__: super(YourStage, self).__init__()

Variables:
  • self.tags – Tags of the current Robot Framework task.
  • self.tos – TOSLibrary instance of the current RF suite.
  • self.error_msg – Library-wide general error message text.
log_task_objects_in_queue()[source]

Log the number of passed task objects in stage 0.

main_loop(*args, **kwargs)[source]

The main loop for creating new task objects. Call this as a Robot Framework keyword, don’t redefine yourself.

Methods called inside main_loop:

  • preloop_action() (optional) prefetches the data, and is the common use case (read excel, inbox, etc.)
  • process_data() (required) does some additional work for every item that has been read in preloop_action. Task objects are created from the return value of process_data(). Note that task objects are always created after this step. If you need to filter out some data items, that must be done in preloop_action().
  • Alternatively process_data() can do things on its own. For example, poll inbox as long as there are new messages, and for every message do some action.

Example usage:

from RPALibrary.stages.Producer import Producer


class ExampleStage(Producer):
    def preloop_action(self):
        return ["one", "two", "five"]

    def process_data(self, item=None):
        mapping = {"one": 1, "two": 2, "five": 3}
        return {"magic_number": mapping[item]}

The result will be three task objects with payload values {"magic_number": 1}, {"magic_number": 2}, and {"magic_number": 3}.

postprocess_data(to)[source]

Postprocess already created task object data. Optional.

Do some post processing on the data, e.g. parse raw email body. If this fails, the task object status will change into the corresponding failure.

Remember that the task object is already created when this step is run.

preloop_action() → Union[list, tuple, Generator[T_co, T_contra, V_co]][source]

Prefetch the data for processing. Optional.

This can return a sequence or a generator. The return value will be turned into an list_iterator anyway inside main_loop. The iterator will be assigned to self.data.

Examples: * Read all unread messages from inbox into a list * Read excel data into Pandas DataSeries and lazily iterate over it.

Returns:data as a sequence (can be list, tuple) or a generator.
process_data(item=None) → dict[source]

Define how the data is turned into a task object. Required.

Task objects will be created from the data this method returns. It’s important to define this with the item parameter.

Example: read raw data of one email message to be put into a task object.

Two use cases:

  • We read excel or inbox contents into a (list) iterator in preloop_action → This method will need to get one item from that iterator at a time.
  • We read something one item at a time here and want to process them immediately. → This method will do the fetching and processing on its own. Nothing is fetched into a list before the loop.
Parameters:item – data item to parse/process.
Returns payload:
 payload contents as a dict. main_loop will automatically put the contents into TOS as a new object document.

RPALibrary.stages.Consumer

class RPALibrary.stages.Consumer.Consumer[source]

Consumer base definition. Inherit your own consumer stage from this.

Every inheriting library must have a method main_action which defines the steps to be done for every task object in the current stage. To run the actions, call main_loop defined here (override only when necessary).

Usage example:

from RPALibrary.stages import Consumer

class PDFMerger(Consumer):

    def __init__(self):
        '''
        Remember to call ``super`` in the constructor.
        '''
        super(PDFMerger, self).__init__()
        self.merger = PdfFileMerger()

    @keyword
    def merge_all_pdfs(self, filename):
        '''
        Get every valid task object from the DB
        and do the action defined in ``main_action``.
        Exceptions are handled and logged appropriately.
        '''
        count = self.main_loop(current_stage=4)
        if count:
            write_merged_pdf_object(filename)

    def main_action(self, to):
        '''Append pdf as bytes to the pdf merger object.'''
        pdf_bytes = io.BytesIO(to["payload"]["pdf_letter"])
        self.merger.append(pdf_bytes)

And the corresponding Robot Framework script:

*** Settings ***
Library                 PDFMerger

*** Tasks ***
Manipulate PDF files
    Merge All PDFs      combined.pdf
__init__()[source]

Remember to super call this constructor if your stage as its own __init__: super(YourStage, self).__init__()

Variables:
  • self.tags – Tags of the current Robot Framework task.
  • self.tos – TOSLibrary instance of the current RF suite.
  • self.error_msg – Library-wide general error message text.
  • self.stop_after_limit – If set, looping will stop after this many task objects.
  • self.should_stop_reason – If set, looping will stop.
main_action(to)[source]

The main action by which each task object is “consumed”.

You should make the implementation yourself. This will be called in the main_loop and should contain all the steps that should be done with the data stored in one task object.

Don’t call this from Robot Framework, call main_loop instead.

Parameters:to (dict) – task object
main_loop(*args, **kwargs)[source]

The main loop for processing task objects on a given stage.

Get task objects ready for processing and do the actions as defined in method main_action(). Continue doing this as long as valid task objects are returned from the DB. The counter value must be returned for logger decorator consumption.

Using this method/keyword gives you automatic logging and looping over valid task objects. You can override this method to suit your specific needs.

Remember to explicitly call this from Robot Framework.

Parameters:kwargs
  • stage (int or str) - the current stage where this is called from. If this is not given, the stage is inferred from the Robot Framework task level tag.
  • status (str) - the status of the task objects to process (default is ‘pass’)
  • change_status (bool) - decide if the status should be changed after main_action or kept as the original (default is True).
  • error_msg (str) - Custom error message if the action here fails (optional).
  • getter (callable) - the method which is used to get the data to process. This might be a custom input data fetcher. By default it is find_one_task_object_by_status_and_stage. Note that using a custom getter is considered experimental. Custom getter could be, e.g. collection.find_one_and_update. Very important to update the object state in the same operation- otherwise main_loop will loop infinitely! This functionality will be deprecated in the future.
  • getter_args (dict)- arguments that should be passed to the custom getter. By default the arguments are {"statuses": status, "stages": previous_stage}. Note that they must be given as a dict, where the keys are the argument names, eg. {"filter": {"payload.age": "older"}} when the getter signature is find_one_and_update(filter=None, update=None).
  • amend (dict) - additional MongoDB query to be used for filtering task objects (optional).
  • main_keyword (str) - custom method name to use in place of main_action (optional).
  • sort_condition (str) - custom condition to be used in sorting the task objects, e.g. sort_condition=[(“_id”, pymongo.DESCENDING)] (optional).
Returns:number of task objects processed
Return type:int
Variables:new_status – the new status returned from the handle_errors() decorator.
pre_action(to)[source]

Setup steps.

Action to do for every task object before the error handled main action.

You should make the implementation yourself, if needed.

Parameters:to (dict) – task object

RPALibrary.deco

Decorators for Python-based keywords.

RPALibrary.deco.error_name(error, critical=False)[source]

Name error handlers with corresponding error messages.

Parameters:
  • error (Enum) –
  • critical (bool) – if error is critical, shut down and don’t try to retry or continue. Default False.
RPALibrary.deco.handle_errors(error_msg='') → Tuple[dict, str, str][source]

Decorator for handling all general exceptions.

Function to decorate func is the set of actions we are trying to do (e.g., main_action method). That function can take arbitrary arguments. All exceptions are caught when this function is called. When exception occurs, full stacktrace is logged with Robot Framework logger and the status of task object is set to ‘fail’.

The task object should be passed as a keyword argument so it can be accessed here inside the decorator, but really it is used only for logging. Nothing breaks if it is omitted.

Returns:tuple of (value, status, error), where value is the return value of the decorated function or None, and the error is the text from the exception encountered in this function call. Status is always either “pass”, “fail”, “expected_fail”, or “skip”.

Usage example:

class RobotLibrary:

    def __init__(self):
        self.error_msg = "Alternative error message"

    @handle_errors("One is not one anymore")
    def function_which_might_fail(self, to=None):
        if to["one"] != 1:
            raise ValueError
>>> RobotLibrary().function_which_might_fail(to={"one": 2})
RPALibrary.deco.log_number_of_created_task_objects(func)[source]

Decorator for logging the number of processed task objects.

Note that the function to decorate should return the number of processed objects.

RPALibrary.exceptions

Exceptions for TOSLibrary.

exception RPALibrary.exceptions.AbortException[source]
exception RPALibrary.exceptions.BusinessException[source]

A known, non-techical error or another reason to stop running.

exception RPALibrary.exceptions.CannotCreateTaskObject[source]
exception RPALibrary.exceptions.DataAlreadyProcessed[source]
exception RPALibrary.exceptions.SkipProcessing[source]

A known case e.g., when input is invalid and nothing should be done aboot it.

This is not a failure.

RPALibrary.exceptions.get_status_by_exception(exception)[source]

Get status string by exception.

The given exception instance can be inherited from the known base exceptions defined here in RPALibrary.

RPALibrary.helpers

Utility functions for TOSLibrary.

RPALibrary.helpers.get_stage_from_tags(tags: List[T]) → int[source]

Get the stage number from the stage tag.

It is required that the stage tags include one tag of the form ‘stage_0’.

Example:

>>> tags = ["stage_0", "repeat"]
>>> get_stage_from_tags(tags)
0
RPALibrary.helpers.handle_dead_selenium()[source]

If selenium/chromedriver is killed on purpose or by error, stop the robot execution to prevent unnecessary retrying.

RPALibrary.helpers.handle_signals()[source]

Listen for TERM / INT signals and handle appropriately

RPALibrary.helpers.repeat_call_until(*args, **kwargs)[source]

Repeat given function or method call until given condition is True.

Parameters:kwargs
  • callable (func) - Callable method or function (required).
  • condition (func) - Condition to wait for (required)
  • arguments (tuple) - Tuple of arguments to pass for the callable.
  • kwarguments (dict) - Dict of kw arguments to pass for the callable.
  • limit (int) - limit for retries.
  • sleep (int) - time to sleep between retries.

Usage:

class RobotLibrary:
    '''Example RF library with retry condition as a method.'''
    def btn1_is_enabled(self):
        return browser.find_element_by_name("btn1").is_enabled()

    @keyword
    def retry_clicking_until_success(self):
        repeat_call_until(
            callable=BuiltIn().run_keyword,
            condition=self.btn1_is_enabled,
            arguments=("SeleniumLibrary.Click Element", "btn2"),
            limit=5
        )
RPALibrary.helpers.sigint_handler(signum, frame)[source]

Detect SIGINT or Keyboard interrupt signals.

Pass the information as exception message to action_on_fail later on to be handled.

RPALibrary.helpers.sigterm_handler(signum, frame)[source]

Detect SIGTERM signals.

Pass the information as exception message to action_on_fail later on to be handled.

RPALibrary.helpers.take_screenshot()[source]

Take screenshot of all visible screens.

Use Robot Framework standard library with Scrot screenshotting utility.

RPALibrary.messages

RPAListener

RPAListener

class for Robot Framework-driven RPA using TOS.

class RPAListener.RPAListener.RPAListener(db_server, db_name, db_user=None, db_passw=None, db_auth_source=None, collection_suffix='', separate_payloads=False, payloads_ttl=0, item_variable_name='ITEM', item_identifier_field=None, collection_prefix='', mongo_client_options={})[source]

RPAListener class is used for automatic handling of task objects to be worked by RPA stage definitions that have been defined in Robot Framework script.

run_keyword_and_skip_on_failure(name, *args)[source]

Runs the keyword and sets task object status to skip if failure occurs.

The keyword to execute and its arguments are specified using name and *args exactly like with Run Keyword.

Skip Task Object is called conditionally based on the execution result, causing rest of the current keyword and task to be skipped in case of failure.

The error message from the executed keyword is printed to the console andset to the task object’s exception field.

Process execution proceeds with the next task object.

Example:

Run Keyword And Skip On Failure    Should Be True    ${invoice_amount} > 0
...    msg=Invoice is for a zero amount, no action required
skip_task_object(reason, to_id=None)[source]

Sets the task object status to skip and skips rest of the current task.

By default, targets the current task object. Object id is persisted in class attribute skipped_task_objects.

update_current_item(*key_value_pairs, **items)[source]

Update the contents of the currently worked item’s payload in the database.

The database document will be upserted with values from the item’s payload dictionary at the time of calling. Additional variables can be added with the given key_value_pairs and items.

Giving items as key_value_pairs means giving keys and values as separate arguments:

Set To Dictionary    ${D1}    key    value    second    ${2}
${D1} = {'a': 1, 'key': 'value', 'second': 2}

Set To Dictionary    ${D1}    key=value    second=${2}

The latter syntax is typically more convenient to use, but it has a limitation that keys must be strings.

If given keys already exist in the dictionary, their values are updated.

class RPAListener.RPAListener.RPATask(name, task_template)[source]

RPATask object is created for each task iteration in a transactional RPA process stage