Sub-tasking / Task Callbacks

What is sub-tasking?

Sub-tasking is the ability for a task to spin off sub-tasks and wait for them to finish before continuing execution of its own. Tasks will wait for all of their sub-tasks to complete before potentially entering a "submitted" state themselves for an agent to pick them up.

When a task has outstanding subtasks, its status will change to "delegating" while it waits for them all to finish.

What are task callbacks?

Task callbacks are functions that get executed when a task enters a "completed=True" state (i.e. when it completes successfully or encounters an error). These can be registered on a task itself

task.completed_callback_function = self.shell_completed

or on a subtask:

resp = await MythicRPC().execute("create_subtask", parent_task_id=task.id,
        command="pwd", params="",
        subtask_callback_function=self.shell_subtask_completed.__name__)

Where are they?

Like everything else associated with a Command, all of this information is stored in your command's Python file. Sub-tasks are created via RPC functions from within your command's create_tasking function (or any other function - i.e. you can issue more sub-tasks from within task callback functions). Let's look at an example below: this is an example of a shell tasking that checks if the user supplied the parameters of whoami, if so, it spins off two sub-tasks (each with their own callback functions).

async def create_tasking(self, task: MythicTask) -> MythicTask:
    resp = await MythicRPC().execute("create_artifact", task_id=task.id,
        artifact="/bin/sh -c {}".format(task.args.get_arg("command")),
        artifact_type="Process Create",
    )
    resp = await MythicRPC().execute("create_artifact", task_id=task.id,
        artifact="{}".format(task.args.get_arg("command")),
        artifact_type="Process Create",
    )
    task.display_params = task.args.get_arg("command")
    task.completed_callback_function = self.shell_completed
    if task.display_params == "whoami":
        resp = await MythicRPC().execute("create_subtask", parent_task_id=task.id,
                                         command="pwd", params="",
                                         subtask_callback_function=self.shell_subtask_completed.__name__)
        resp = await MythicRPC().execute("create_subtask", parent_task_id=task.id,
                                         command="shell", params="echo -c 'hi'",
                                         subtask_callback_function=self.shell_subtask_completed.__name__)
    return task

async def shell_completed(self, task: MythicTask, subtask: dict = None, subtask_group_name: str = None) -> MythicTask:
    resp = await MythicRPC().execute("create_output", task_id=task.id,
                                     output="called shell_completed because shell is done!"
                                     )
    return task

async def shell_subtask_completed(self, task: MythicTask, subtask: dict = None,
                          subtask_group_name: str = None) -> MythicTask:
    resp = await MythicRPC().execute("get_responses", task_id=subtask["id"])

    resp = await MythicRPC().execute("create_output", task_id=task.id,
                                     output=f"shell's subtask, {subtask['command']}, is done! Had output of:\n" + json.dumps(resp.response)
                                     )
    return task

When issuing a subtask, we can simply do:

resp = await MythicRPC().execute("create_subtask", parent_task_id=task.id,
command="pwd", params="",
subtask_callback_function=self.shell_pwd_completed.__name__)

When supplying the subtask_callbackfunction for a subtask, you need to supply the NAME of the function. You can either just supply it as a string "shell_subtask_completed" or you can use Python's build in __name__ capability to pull the name automatically from the path to a function. The function MUST EXIST IN THE SAME CLASS AS YOUR CREATE_TASKING.

This means that our shell command will spin off a subtask of pwd with no parameters. When that ask completes, the function called shell_subtask_completed is executed. This function gets two parameters:

  • an instance of the shell task (this is the same context with all the same information as you'd get from your create_tasking function.

  • a dictionary with the information about the subtask itself. This does NOT include that task's response output. This does NOT include that task's response output. The task could have a lot of output or it could simply register data within Mythic (like credentials, files, etc) and not really display anything to the user. So, if you want the actual content that was generated by the task, you need to do another RPC call. We can see this in the function.

resp = await MythicRPC().execute("get_responses", task_id=subtask["id"])

This RPC call gets all of the response data about that subtask. This returns a dictionary with a few fields:

  • files - this is an array of dictionaries about all of the files uploaded/downloaded/registered with that task

  • artifacts - this is an array of dictionaries for all the artifacts generated with the task

  • credentials - this is an array of dictionaries for all the credentials stored from the task

  • user_output - this is an array of dictionaries for all of the output displayed to the user from the task

Task Callbacks

We just saw an example of how a task registered a callback function for itself for when its subtask enters a "completed" state. Tasks can also register completion functions for when their own task enters a completed state. So, we can see in the following example that the shell task registers a completion function for when itself (not a subtask) is completed:

async def create_tasking(self, task: MythicTask) -> MythicTask:
    resp = await MythicRPC().execute("create_artifact", task_id=task.id,
        artifact="/bin/sh -c {}".format(task.args.get_arg("command")),
        artifact_type="Process Create",
    )
    resp = await MythicRPC().execute("create_artifact", task_id=task.id,
        artifact="{}".format(task.args.get_arg("command")),
        artifact_type="Process Create",
    )
    task.display_params = task.args.get_arg("command")
    task.completed_callback_function = self.shell_completed
    return task
    
async def shell_completed(self, task: MythicTask, subtask: dict = None, subtask_group_name: str = None) -> MythicTask:
    resp = await MythicRPC().execute("create_output", task_id=task.id,
                                     output="called shell_completed because shell is done!"
                                     )
    return task

This is useful for when you want to do some post-task processing, actions, analysis, etc when a task completes or errors out. In this example, the shell_completed task simply just displays a message to the user that the task is done. In more interesting examples though, you could use the get_responses RPC call like we saw above to get information about all of the files that were downloaded from a task, then process them for potential credentials, interesting data, OCR, etc and provide that output back to the user.

Last updated