With Mythic 3.0, you have two options for your containers - GoLang or Python. This section will only go over the Python side since there are no 2.3 agents with GoLang containers.
When you git clone the new Mythic v3.0.0 you'll notice that there's no mythic-cli binary. To reduce the size of the GitHub clones, this binary is now included as part of the main base docker image, so run sudo make and the binary will be downloaded and copied into the normal spot.
Make sure you update your Docker version to at least 20.10.22 or above (current is 23.0.1. This is required for the latest docker containers to work properly. A simple sudo apt upgrade and install should suffice. Also install docker-compose via sudo apt install docker-compose-plugin vs the docker-compose script as the script will soon be deprecated according to Docker.
The ExternalAgent format is still the same. These next pieces are how you can test updates of your agent locally before copying your agent's folder back into your normal ExternalAgent format.
The Mythic/InstalledServices folder is on the Mythic server. When you installed a PayloadType via ./mythic-cli install github <url>, the folder within your github's Payload_Type folder is copied to the Mythic/InstalledServices folder.
If you are doing local agent development instead of on the same server as the Mythic instance, then in the following steps you can treat the Mythic/InstalledServices folder the same as your GitHub's Payload_Type folder (or any other folder really - it's simply serving as a staging ground while you create the new folder structure).
If you're doing local development, you need at least Python version 3.10 because of the new typing features it offers.
Make a directory, agent name , in Mythic/InstalledServices
Copy your entire Payload type's agent name directory into InstalledServices (yes, the path will look like Mythic/InstalledServices/agentName/agentName)
In Mythic/InstalledServices/agentName create a main.py and a Dockerfile
In your new Dockerfile, copy the contents of your old Dockerfile and change the FROM line to FROM itsafeaturemythic/mythic_python_base:latest
In your new main.py add:
import mythic_container
from [agent name].mythic import *
mythic_container.mythic_service.start_and_run_forever()
To make your new agent name directory a PyPi package that can be imported, create a __init__.py file in Mythic/InstalledServices/agentName/agentName. In the Mythic/InstalledServices/agentName/agentName/Mythic folder make a __init__.py file with the following contents (this will loop through all of your command files and import them automatically):
import globimport os.pathfrom pathlib import Pathfrom importlib import import_module, invalidate_cachesimport sys# Get file paths of all modules.currentPath =Path(__file__)searchPath = currentPath.parent /"agent_functions"/"*.py"modules = glob.glob(f"{searchPath}")invalidate_caches()for x in modules:ifnot x.endswith("__init__.py")and x[-3:]==".py": module =import_module(f"{__name__}.agent_functions."+Path(x).stem)for el indir(module):if"__"notin el:globals()[el] =getattr(module, el)sys.path.append(os.path.abspath(currentPath.name))
In your Mythic/InstalledServices/agentName/agentName/mythic/agent_functions files, we need to replace all mythic_payloadtype_container with mythic_container .
If you have an import like from agent_functions.execute_pe import PRINTSPOOFER_FILE_ID which references another command file, update it to from .execute_pe import PRINTSPOOFER_FILE_ID. If you include a local library at the same level as agent_functions, you can import it like from [agent name].mythic.[package] import [thing]
If you're doing local development, you'll need a rabbitmq_config.json file at the same level as your main.py to tell your service where Mythic is located and the rabbitmq password. The configuration options you can supply can be found in the Local Development section.
There are some changes to the rabbitmq_config.json file keys:
container_files_path is no longer used and can be deleted.
username is no longer used and can be deleted.
password is now rabbitmq_password.
host is now rabbitmq_host.
name is no longer used and can be deleted.
virtual_host is no longer used and can be deleted.
Required keys in rabbitmq_config.json are:
rabbitmq_host - points to the IP where Mythic lives
rabbitmq_password - the password used to authenticate to rabbitmq
mythic_server_host - points to the IP where Mythic lives
mythic_server_port - if you're using something other than the default (this is NOT the 7443 that you use for the UI)
mythic_server_grpc_port - if you're using something other than the default
Now to actually update the content of your builder/command files. There's not much you need to do.
builder.py agent definition
Because the new structure treats your entire agent directory as a Python package, the container no longer knows the paths for things. This gives you a lot more freedom in how you want to organize your code, but does require you to specify where things are located. In your builder.py file where you define your Payload Type, you need to add the following:
The agent_path is the path to your general agent structure (typically with the agent_functions as a sub-folder. The agent_code_path points to your agent's actual code.
Something that's a little different is the agent icons - the agents will sync that over automatically with the rest of their definition (no more having to copy it over manually or get it from an install). What that means though is you either need to supply agent_icon_path and provide the path to your agent's svg icon or specify agent_icon_bytes and provide the raw bytes for your icon.
build
In your payload type's build function you can report back on build steps via the SendMythicRPCPayloadUpdateBuildStep RPC call (based on your defined build steps). This will update the UI step-by-step for the operator so they know what's going on.
You can also set UpdatedFilename (or updated_filename for Python) in your build response and adjust the final filename of the payload. This can be helpful if your payload type allows you to build to various outputs (exe, dll, dylib, binary, etc). This allows you to adjust the filename based on that so that when the user clicks "download" in the UI, they get the right file and don't have to change the filename.
Browser scripts
Browserscripts work just the same, but browserscripts will look for their code at agent_path / browser_scripts / filename.js OR at the path specified by the name parameter for the script. So, that means your can either specify the name as test.js and have it located in your agent_path / browser_scripts / test.js file or specify a full path as your name.
The browser_script attribute is a single BrowserScript value, not an array. This is because the entire Python back-end is gone, so there's no more need to supply a script for the old UI and the new UI.
c2 profile parameters when building
When looping through c2 profile parameters - arrays are actually arrays, crypto types and dictionary types are dictionaries, so do better checking here for name of parameters. A bunch of agents simply check if the supplied value is a dictionary and then automatically try to pull out certain values, but that might not be the case anymore. For example, when looping through the http profile, both the AESPSK and the headers parameters will be passed in as dictionaries.
for key, val in c2.get_parameters_dict().items():if key =="AESPSK": c2_code = c2_code.replace(key, val["enc_key"] if val["enc_key"] isnotNoneelse"")elifnotisinstance(val, str): c2_code = c2_code.replace(key, json.dumps(val))else: c2_code = c2_code.replace(key, val)
Updated Create Tasking - create_go_tasking
The current create_tasking functions still work just like normal; however, the newer create_go_tasking function gives you more contextual data and mirrors the data structures from the new Golang container version.
Tasks can specify for a certain function to execute when the task finishes executing. That hasn't changed. However, the format of how you define it has changed slightly. Before, you'd simply pass the name of a function and the container would loop through all known function definitions looking for one that matched. That's not super great, so now you define a dictionary of function name to function as part of your command definition.
The PTTaskCompletionFunctionMessage and response classes can be found in the PyPi code and auto-completed via IDEs. This syntax is just the Python way of saying that the format is:
asyncdeffunctionName(myArg: PTTaskCompletionFunctionMessage) -> PTTaskCompletionFunctionMessageResponse: do something here
To leverage this new functionName function as part of your tasking, in your create_tasking function you need to set the name:
Sending back data via the process_response key within your responses allows you to hook into the associated command's process_response function within your Payload Type's container. The format of this function has changed slightly:
which is to say that the function is pre-defined (one per command parameter) and looks like:
asyncdefdynamic_query_function(myArg: PTRPCDynamicQueryFunctionMessage) -> PTRPCDynamicQueryFunctionMessageResponse: do something
Command OPSEC
The opsec functionality has been removed from a special CommandOPSEC class and moved to the main command class itself. So, your command can have two additional functions:
The RPC call to start SOCKS is no longer control_socks. Instead, you'll use the SendMythicRPCProxyStart and SendMythicRPCProxyStop commands as detailed here.
C2 Profiles
C2 profiles also need to be updated for Mythic 3.0.0, in an extremely similar way to Payload Types.
Make a directory, c2 name , in Mythic/InstalledServices
Copy your entire C2 Profile's c2 name directory into InstalledServices (yes, the path will look like Mythic/InstalledServices/c2Name/c2Name)
Remove c2_service.sh, mythic_service.py, and rabbitmq_config.json from your mythic folder
Remove C2_RPC_Functions.py
In Mythic/InstalledServices/c2Name create a main.py and a Dockerfile
In your new Dockerfile, copy the contents of your old Dockerfile and change the FROM line to FROM itsafeaturemythic/mythic_python_base:latest
In your mythic/c2_functions/ folder, your definition file should import mythic_container instead of mythic_c2_container (similar to what we did for agent updates).
In your new main.py add:
import mythic_container
from [c2 name].mythic import *
mythic_container.mythic_service.start_and_run_forever()
In your c2 profile definition, add in two more attributes - server_folder_path (path to the folder where your server binary and config.json files exist), and server_binary_path (path to the binary to execute if you're doing an egress c2 profile and not a p2p profile).
To see what this looks like all together, look at the websocket example here: https://github.com/MythicMeta/ExampleContainers/tree/main/Payload_Type/python_services. You'll notice that the websocket is just one of multiple services that the single docker contianer is offering. If you want your container to only offer that one, then you can remove the other folders and adjust your main.py accordingly.
Keys in the C2 Profile Parameter Type Dictionary will be sorted alphabetically - they will NOT maintain the order they were specified in the UI. This is currently a limitation of the Golang Google JSON library.
Env Settings
Mythic provides a MYTHIC_ADDRESS environment variable that points to http://mythic_server:17443/agent_message for C2 Profiles to use for forwarding their messages. With Mythic 3.0+, there are going to be more options for connections outside of a static HTTP endpoint. Therefore, the MYTHIC_ADDRESS field exists, but there's additional values for MYTHIC_SERVER_HOST and MYTHIC_SERVER_PORT so that we can dynamically use these later on.
Translation Containers
Translation containers are no different than C2 Profiles and Payload Types for the new format of things. Look to translator in the ExampleContainers (https://github.com/MythicMeta/ExampleContainers/tree/main/Payload_Type/python_services) repository for an example of how to format your new structure. Translation containers boil down to one class definition with a few functions.
One big change from Mythic 2.3 -> 3.0 for Translation Containers is that they now operate over gRPC instead of RabbitMQ. This means that they need to access the gRPC port on the Mythic Server if you intend on running a translation container on a separate host from Mythic itself. This port is configurable in the Mythic/.env file, but by default it's 17443. This change to gRPC instead of RabbitMQ for the translation container messages speeds things up and reduces the burden on RabbitMQ for transmitting potentially large messages.
Some additional notes about Translation container message updates:
Although Mythic 3 will base64 decode a message before providing it to translate_from_c2_format, Mythic 3 will not base64 encode the result of translate_to_c2_format which you will still need to do like you would have for Mythic 2.3
Mythic 2.3 allowed UUID prefixes to custom agent messages to a little endian encoded 16 byte value. In Mythic 3 any 16 byte UUID prefix needs to be big endian encoded
Mythic 2.3 required the translation container to base64 encode/decode inputs and outputs for generate_keys. Mythic 2.3 would directly use that base64 data to populate enc_key or dec_key values for building and would provide that base64 data directly to any translate_to_c2_format and translate_from_c2_format functions.
Mythic 3 expects generate_keys to provide the keys as byte arrays. Mythic 3 will base64 encode/decode the byte arrays when populating any enc_key or dec_key value for an agent configuration, but will use the byte array when calling any translate_to_c2_format and translate_from_c2_format function
Mythic 2.3 would provide the entire message as input to translate_from_c2_format. Mythic 3 provides the message, minus any UUID prefix
from abc import abstractmethod, ABCMeta
import json
from enum import Enum
import base64
import uuid
from pathlib import Path
from .logging import logger
import sys
from collections.abc import Callable, Awaitable
from mythic_container.MythicGoRPC.send_mythic_rpc_payload_create_from_scratch import \
MythicRPCPayloadConfigurationBuildParameter, MythicRPCPayloadConfigurationC2Profile
class SupportedOS:
"""Supported Operating System
This OS value is selected first when generating a payload or wrapper.
If you don't want to use a listed value, supply your own with
SupportedOS("my os")
"""
Windows = "Windows"
MacOS = "macOS"
Linux = "Linux"
WebShell = "WebShell"
Chrome = "Chrome"
def __init__(self, os: str):
self.os = os
def __str__(self):
return self.os
class MythicStatus:
Error = "error"
Completed = "completed"
Processed = "processed"
Processing = "processing"
Preprocessing = "preprocessing"
Delegating = "delegating subtasks"
CallbackError = "task callback error"
Success = "success"
def __init__(self, status: str):
self.status = status
def __str__(self):
return self.status
def __eq__(self, obj):
# check if self.status == obj
if isinstance(obj, str):
return self.status == obj
elif isinstance(obj, MythicStatus):
return self.status == obj.status
else:
return False
class ParameterType(str, Enum):
"""Types of parameters available for Commands
File type will be a file's UUID in your parse_arguments and create_go_tasking functions.
Credential_JSON will be a dictionary of credential attributes (account, realm, credential, comment).
PayloadList will be a list of payloads to the user to select via a dropdown option, but will be passed back as a payload's UUID.
ConnectionInfo is a dictionary of Payload/C2 information to help make P2P connections easier for an agent.
LinkInfo is a dictionary of the same information as ConnectionInfo, but presented to the user as a dropdown of choices.
TypedArray is an array of tuples that represent an element's type and its value: [ ("string", "abc"), ("int", "5") ]
"""
String = "String"
Boolean = "Boolean"
File = "File"
FileMultiple = "FileMultiple"
Array = "Array"
ChooseOne = "ChooseOne"
ChooseOneCustom = "ChooseOneCustom"
ChooseMultiple = "ChooseMultiple"
Credential_JSON = "CredentialJson"
Number = "Number"
Payload = "PayloadList"
ConnectionInfo = "AgentConnect"
LinkInfo = "LinkInfo"
TypedArray = "TypedArray"
class CommandAttributes:
"""Metadata attributes about a command
These attributes help determine which commands can be loaded in, which ones are always included, and even free-form attributes the developer wants to track.
filter_by_build_parameter is of the form {"build param name": "build param value"}
Attributes:
supported_os (list[SupportedOS]): Which operating systems does this command support? An empty list means all OS.
builtin (bool): Is this command baked into the agent permanently?
suggested_command (bool): If true, this command will appear on the "included" side when building your payload by default.
load_only (bool): If true, this command can only be loaded after you have a callback and not included in the base payload.
filter_by_build_parameter (dict): Specify if this command is allowed to be built into the payload or not based on build parameters the user specifies.
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self,
spawn_and_injectable: bool = False,
supported_os: [SupportedOS] = None,
builtin: bool = False,
suggested_command: bool = False,
load_only: bool = False,
filter_by_build_parameter: dict = {},
**kwargs):
self.spawn_and_injectable = spawn_and_injectable
self.supported_os = supported_os
self.builtin = builtin
self.suggested_command = suggested_command
self.load_only = load_only
self.filter_by_build_parameter = filter_by_build_parameter
self.additional_items = {}
for k, v in kwargs.items():
self.additional_items[k] = v
def to_json(self):
r = {}
if self.spawn_and_injectable is not None:
r["spawn_and_injectable"] = self.spawn_and_injectable
else:
r["spawn_and_injectable"] = False
if self.supported_os is not None:
r["supported_os"] = [str(x) for x in self.supported_os]
else:
r["supported_os"] = []
r["builtin"] = self.builtin
r["suggested_command"] = self.suggested_command
r["load_only"] = self.load_only
r["filter_by_build_parameter"] = self.filter_by_build_parameter
r["additional_items"] = self.additional_items
return r
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class ParameterGroupInfo:
"""Information about a group of parameters for a command.
ParameterGroups allow conditional parameters displayed to the user.
Attributes:
required (bool): In this parameter group, is this parameter required?
group_name (str): The name of the parameter group? If one isn't provided, the default is `Default`
ui_position (int): For this parameter group, which order should the parameters appear in the UI?
Functions:
to_json(self): return dictionary form of class
"""
def __init__(
self,
required: bool = True,
group_name: str = "Default",
ui_position: int = None,
**kwargs
):
self.required = required
self.group_name = group_name
self.ui_position = ui_position
self.additional_info = {}
for k, v in kwargs.items():
self.additional_info[k] = v
def to_json(self):
r = {"required": self.required, "group_name": self.group_name, "ui_position": self.ui_position,
"additional_info": self.additional_info}
return r
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class PTRPCDynamicQueryFunctionMessage:
"""Request to dynamically generate choices for a modal in the UI with a ChooseOne or ChooseMultiple parameter type.
Attributes:
Command (str): Name of the command
ParameterName (str): Name of the parameter
PayloadType (str): Name of the PayloadType
CommandPayloadType (str): Name of the payload type associated with this command
Callback (int): ID of the Callback where this function is called. This can be used for PRC calls to Mythic
CallbackDisplayID (int): Display ID of the callback where this function is called
AgentCallbackID (str): Agent UUID of the callback where this function is called
PayloadOS (str): The selected OS when the backing payload for this callback was created
PayloadUUID (str): The UUID of the backing payload for this callback that can be used to fetch more information
Secrets (dict): User secrets based on the operator that issued this action
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self,
command: str,
parameter_name: str,
payload_type: str,
command_payload_type: str,
callback: int,
payload_os: str = "",
payload_uuid: str = "",
agent_callback_id: str = "",
callback_display_id: int = 0,
secrets: dict = {},
**kwargs
):
self.Command = command
self.CommandPayloadType = command_payload_type
self.ParameterName = parameter_name
self.PayloadType = payload_type
self.Callback = callback
self.PayloadOS = payload_os
self.PayloadUUID = payload_uuid
self.AgentCallbackID = agent_callback_id
self.CallbackDisplayID = callback_display_id
self.Secrets = secrets
def to_json(self):
return {
"command": self.Command,
"parameter_name": self.ParameterName,
"payload_type": self.PayloadType,
"command_payload_type": self.CommandPayloadType,
"callback": self.Callback,
"payload_os": self.PayloadOS,
"payload_uuid": self.PayloadUUID,
"agent_callback_id": self.AgentCallbackID,
"callback_display_id": self.CallbackDisplayID,
"secrets": self.Secrets
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class PTRPCDynamicQueryFunctionMessageResponse:
"""Results of performing a dynamic query for a command
Attributes:
Success (bool): Did the dynamic query function successfully execute
Error (str): If the dynamic query function failed to run, this is the string error
Choices (list[str]): List of the string choices to present back to the user. If there are no valid choices, return []
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self,
Success: bool = False,
Error: str = None,
Choices: list[str] = []):
self.Success = Success
self.Error = Error
self.Choices = Choices
def to_json(self):
return {
"success": self.Success,
"error": self.Error,
"choices": self.Choices
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class PTRPCTypedArrayParseFunctionMessage:
"""Request to parse CLI provided input for the TypedArray parameter type.
Attributes:
Command (str): Name of the command
ParameterName (str): Name of the parameter
PayloadType (str): Name of the PayloadType
CommandPayloadType (str): Name of the payload type for the command issued
Callback (int): ID of the Callback where this function is called. This can be used for PRC calls to Mythic
InputArray (list[str]): List of strings that the user supplied for the TypedArray parameter type
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self,
command: str,
parameter_name: str,
payload_type: str,
command_payload_type: str,
callback: int,
input_array: list[str]):
self.Command = command
self.ParameterName = parameter_name
self.PayloadType = payload_type
self.CommandPayloadType = command_payload_type
self.Callback = callback
self.InputArray = input_array
def to_json(self):
return {
"command": self.Command,
"parameter_name": self.ParameterName,
"payload_type": self.PayloadType,
"command_payload_type": self.CommandPayloadType,
"callback": self.Callback,
"input_array": self.InputArray
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class PTRPCTypedArrayParseFunctionMessageResponse:
"""Results of parsing the user's input array into an array of tuples for the TypedArray parameter type
Attributes:
Success (bool): Did the dynamic query function successfully execute
Error (str): If the dynamic query function failed to run, this is the string error
TypedArray (list[tuple]): List of tuples (type, string) for the TypedArray parameter type
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self,
Success: bool = False,
Error: str = None,
TypedArray: list[list] = []):
self.Success = Success
self.Error = Error
self.TypedArray = TypedArray
def to_json(self):
return {
"success": self.Success,
"error": self.Error,
"typed_array": self.TypedArray
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class CommandParameter:
"""Definition of an argument for a command
Attributes:
name (str): Name of the parameter. This is used for get_arg, set_arg, etc functions and used when making the final JSON string to send down to the callback
type (ParameterType): The type of parameter this is. This determines valid values and how things are presented to the user in the UI.
display_name (str): More verbose name displayed to the user in the UI
cli_name (str): More simplified name used when using tab-complete on the command line inside Mythic's UI
description (str): Description of the argument displayed when the user hovers over the name of the argument.
choices (list[str]): Choices for the user to select if the type is ChooseOne, ChooseMultiple, or TypedArray
default_value (any): Default value to populate for the user or to select if the user didn't provide anything (such as not using the modal popup).
supported_agents (list[str]): When using the "Payload" Parameter Type, you can filter down which payloads are presented to the operator based on this list of supported agents.
supported_agent_build_parameters (dict): When using the "Payload" Parameter Type, you can filter down which payloads are presented to the operator based on specific build parameters for specific payload types.
choice_filter_by_command_attributes (dict): When using the ChooseOne or ChooseMultiple Parameter type along with choices_are_all_commands, you can filter down those options based on attribute values in your command's CommandAttributes field.
choices_are_all_commands (bool): Can be used with ChooseOne or ChooseMultiple Parameter Types to automatically populate those options in the UI with all of the commands for the payload type.
choices_are_loaded_commands (bool): Can be used with ChooseOne or ChooseMultiple Parameter Types to automatically populate those options in the UI with all of the currently loaded commands.
parameter_group_info (list[ParameterGroupInfo]): Define 0+ different parameter groups that this parameter belongs to.
dynamic_query_function: Provide a dynamic query function to be called when the user views that parameter option in the UI to populate choices for the ChooseOne or ChooseMultiple Parameter Types.
limit_credentials_by_type (list[str]): List of supported credential types when parameter type is CredentialJson. Blank would allow all credential types.
Functions:
to_json(self): return dictionary form of class
"""
def __init__(
self,
name: str,
type: ParameterType,
display_name: str = None,
cli_name: str = None,
description: str = "",
choices: [any] = None,
default_value: any = None,
validation_func: callable = None,
value: any = None,
supported_agents: [str] = None,
supported_agent_build_parameters: dict = None,
choice_filter_by_command_attributes: dict = None,
choices_are_all_commands: bool = False,
choices_are_loaded_commands: bool = False,
dynamic_query_function: Callable[
[PTRPCDynamicQueryFunctionMessage], Awaitable[PTRPCDynamicQueryFunctionMessageResponse]] = None,
typedarray_parse_function: Callable[
[PTRPCTypedArrayParseFunctionMessage], Awaitable[PTRPCTypedArrayParseFunctionMessageResponse]] = None,
parameter_group_info: [ParameterGroupInfo] = None,
limit_credentials_by_type: [str] = None,
):
self.name = name
if display_name is None:
self.display_name = name
else:
self.display_name = display_name
if cli_name is None:
self.cli_name = name
else:
self.cli_name = cli_name
self.type = type
self.user_supplied = False # keep track of if this is using the default value or not
self.description = description
if choices is None:
self.choices = []
else:
self.choices = choices
self.validation_func = validation_func
if value is None:
self._value = default_value
else:
self.value = value
self.default_value = default_value
self.supported_agents = supported_agents if supported_agents is not None else []
self.supported_agent_build_parameters = supported_agent_build_parameters if supported_agent_build_parameters is not None else {}
self.choice_filter_by_command_attributes = choice_filter_by_command_attributes if choice_filter_by_command_attributes is not None else {}
self.choices_are_all_commands = choices_are_all_commands
self.choices_are_loaded_commands = choices_are_loaded_commands
self.dynamic_query_function = dynamic_query_function
if not callable(dynamic_query_function) and dynamic_query_function is not None:
raise Exception("dynamic_query_function is not callable")
self.typedarray_parse_function = typedarray_parse_function
if not callable(typedarray_parse_function) and typedarray_parse_function is not None:
raise Exception("typedarray_parse_function is not callable")
self.parameter_group_info = parameter_group_info
if self.parameter_group_info is None:
self.parameter_group_info = [ParameterGroupInfo()]
self.limit_credentials_by_type = limit_credentials_by_type
@property
def name(self):
return self._name
@name.setter
def name(self, name):
self._name = name
@property
def type(self):
return self._type
@type.setter
def type(self, type):
self._type = type
@property
def description(self):
return self._description
@description.setter
def description(self, description):
self._description = description
@property
def choices(self):
return self._choices
@choices.setter
def choices(self, choices):
self._choices = choices
@property
def validation_func(self):
return self._validation_func
@validation_func.setter
def validation_func(self, validation_func):
self._validation_func = validation_func
@property
def supported_agents(self):
return self._supported_agents
@supported_agents.setter
def supported_agents(self, supported_agents):
self._supported_agents = supported_agents
@property
def limit_credentials_by_type(self):
return self._limit_credentials_by_type
@limit_credentials_by_type.setter
def limit_credentials_by_type(self, limit_credentials_by_type):
self._limit_credentials_by_type = limit_credentials_by_type
@property
def supported_agent_build_parameters(self):
return self._supported_agent_build_parameters
@supported_agent_build_parameters.setter
def supported_agent_build_parameters(self, supported_agent_build_parameters):
self._supported_agent_build_parameters = supported_agent_build_parameters
@property
def dynamic_query_function(self) -> Callable[
[PTRPCDynamicQueryFunctionMessage], Awaitable[PTRPCDynamicQueryFunctionMessageResponse]]:
return self._dynamic_query_func
@dynamic_query_function.setter
def dynamic_query_function(self, dynamic_query_func):
self._dynamic_query_func = dynamic_query_func
@property
def typedarray_parse_function(self) -> Callable[
[PTRPCTypedArrayParseFunctionMessage], Awaitable[PTRPCTypedArrayParseFunctionMessageResponse]]:
return self._typedarray_parse_function
@typedarray_parse_function.setter
def typedarray_parse_function(self, typedarray_parse_function):
self._typedarray_parse_function = typedarray_parse_function
@property
def value(self):
return self._value
@value.setter
def value(self, value):
if value is not None:
type_validated = TypeValidators().validate(self.type, value)
if self.validation_func is not None:
try:
self.validation_func(type_validated)
self._value = type_validated
except Exception as e:
raise ValueError(
"Failed validation check for parameter {} with value {}".format(
self.name, str(value)
)
)
self.user_supplied = True
return
else:
# now we do some verification ourselves based on the type
self._value = type_validated
self.user_supplied = True
return
self._value = value
self.user_supplied = True
return
@property
def parameter_group_info(self):
return self._parameter_group_info
@parameter_group_info.setter
def parameter_group_info(self, parameter_group_info):
self._parameter_group_info = parameter_group_info
def to_json(self):
return {
"name": self._name,
"display_name": self.display_name,
"cli_name": self.cli_name.replace(" ", "-"),
"parameter_type": self._type.value,
"description": self._description,
"choices": self._choices,
"default_value": self.default_value,
"value": self.value,
"supported_agents": self._supported_agents,
"supported_agent_build_parameters": self._supported_agent_build_parameters,
"choices_are_loaded_commands": self.choices_are_loaded_commands,
"choices_are_all_commands": self.choices_are_all_commands,
"choice_filter_by_command_attributes": self.choice_filter_by_command_attributes,
"dynamic_query_function": self.dynamic_query_function.__name__ if callable(
self.dynamic_query_function) else None,
"parameter_group_info": [x.to_json() for x in
self.parameter_group_info] if self.parameter_group_info is not None else [
ParameterGroupInfo().to_json()],
"limit_credentials_by_type": self._limit_credentials_by_type if self._limit_credentials_by_type is not None else []
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class TypeValidators:
def validateString(self, val):
return str(val)
def validateNumber(self, val):
try:
return int(val)
except:
return float(val)
def validateBoolean(self, val):
if isinstance(val, bool):
return val
else:
raise ValueError("Value isn't bool")
def validateFile(self, val):
try: # check if the file is actually a file-id
uuid_obj = uuid.UUID(val, version=4)
return str(uuid_obj)
except ValueError:
pass
return base64.b64decode(val)
def validateArray(self, val):
if isinstance(val, list):
return val
else:
raise ValueError("value isn't array")
def validateCredentialJSON(self, val):
if isinstance(val, dict):
return val
else:
raise ValueError("value isn't a dictionary")
def validatePass(self, val):
return val
def validateChooseMultiple(self, val):
if isinstance(val, list):
return val
else:
raise ValueError("Choices aren't in a list")
def validatePayloadList(self, val):
return str(uuid.UUID(val, version=4))
def validateAgentConnect(self, val):
if isinstance(val, dict):
return val
else:
raise ValueError("Not instance of dictionary")
def validateTypedArray(self, val):
if isinstance(val, list):
if len(val) > 0 and (not isinstance(val[0], tuple) and not isinstance(val[0], list)):
raise ValueError("Value isn't a list of tuples")
return val
else:
raise ValueError("Value isn't a list")
switch = {
"String": validateString,
"Number": validateNumber,
"Boolean": validateBoolean,
"File": validateFile,
"FileMultiple": validateArray,
"Array": validateArray,
"CredentialJson": validateCredentialJSON,
"ChooseOne": validatePass,
"ChooseMultiple": validateChooseMultiple,
"ChooseOneCustom": validatePass,
"PayloadList": validatePayloadList,
"AgentConnect": validateAgentConnect,
"LinkInfo": validateAgentConnect,
"TypedArray": validateTypedArray,
}
def validate(self, type: ParameterType, val: any):
return self.switch[type.value](self, val)
class TaskArguments(metaclass=ABCMeta):
"""Definition of how to process a task's arguments (what the user supplied) into the corresponding command's parameters
Attributes:
command_line (str):
The command line (after parsing by the UI if tasked through the UI)
raw_command_line (str):
The exact thing the user typed/submitted without any parsing
tasking_location (str):
Location of where the task came from (command_line, mythic ui, modal, browser script, etc)
parameter_group_name (str):
The name of the parameter group identified for this set of arguments with values
args (list[CommandParameter]):
The running/updated set of command parameters supplied with values from the user's tasking
Functions:
get_arg:
Return the value for a certain command parameter (either default value or what the user supplied)
has_arg:
Check if a certain command parameter exists
add_arg:
Add/update a command parameter with the specified values, types, and parameter groups
set_arg:
Set the value for a certain command parameter explicitly
rename_arg:
Rename a command parameter's name temporarily
remove_arg:
Remove a command parameter temporarily
set_manual_args:
Explicitly set the string value that'll be passed down to the callback for this task instead of allowing Mythic to determine the JSON string
load_args_from_json_string:
Given a JSON string, parse it into a dictionary and pre-populate command parameters with those values. Any extras are ignored.
load_args_from_dictionary:
Given a dictionary, pre-populate command parameters with those values. Any extras are ignored.
get_parameter_group_name:
Get the current parameter group name based on all the set command parameter values up to this point
get_parameter_group_arguments:
Get all the command parameters that match the current parameter group name
verify_required_args_have_values:
Verify that all command parameters for the current parameter group that are required have values
parse_arguments:
Parse the string supplied by the user into the appropriate command parameters. Often this is just a call to load_args_from_json_string, but if the user typed free-form arguments, you might need to do additional parsing here on the self.command_line or self.raw_command_line fields.
"""
manual_args: str = None
def __init__(self,
command_line: str = "",
tasking_location: str = "command_line",
raw_command_line: str = "",
task_dictionary: dict = {},
initial_parameter_group: str = ""):
self.command_line = str(command_line)
self.tasking_location = tasking_location
self.raw_command_line = raw_command_line
self.task_dictionary = task_dictionary
self.parameter_group_name = None
self.initial_parameter_group = initial_parameter_group
@property
def args(self) -> list[CommandParameter]:
return self._args
@args.setter
def args(self, args):
self._args = args
def get_arg(self, key: str):
for arg in self.args:
if arg.name == key:
return arg.value
return None
def set_manual_args(self, params: str):
self.manual_args = params
def has_arg(self, key: str) -> bool:
for arg in self.args:
if arg.name == key:
return True
return False
def get_command_line(self) -> str:
return self.command_line
def get_raw_command_line(self) -> str:
return self.raw_command_line
def get_tasking_location(self) -> str:
return self.tasking_location
def is_empty(self) -> bool:
return len(self.args) == 0
def add_arg(self, key: str, value, type: ParameterType = None,
parameter_group_info: [ParameterGroupInfo] = None):
found = False
for arg in self.args:
if arg.name == key:
if type is not None:
arg.type = type
if parameter_group_info is not None:
arg.parameter_group_info = parameter_group_info
arg.value = value
found = True
if not found:
newGroupInfo = [ParameterGroupInfo()]
if parameter_group_info is not None:
newGroupInfo = parameter_group_info
if type is not None:
self.args.append(
CommandParameter(name=key, type=type, value=value, parameter_group_info=newGroupInfo))
else:
self.args.append(CommandParameter(name=key, type=ParameterType.String, value=value,
parameter_group_info=newGroupInfo))
def set_arg(self, key: str, value):
found = False
for arg in self.args:
if arg.name == key:
arg.value = value
found = True
if not found:
self.add_arg(key, value)
def rename_arg(self, old_key: str, new_key: str):
for arg in self.args:
if arg.name == old_key:
arg.name = new_key
return
raise Exception("{} not a valid parameter name".format(old_key))
def remove_arg(self, key: str):
self.args = [x for x in self.args if x.name != key]
def to_json(self):
return [x.to_json() for x in self.args]
def load_args_from_json_string(self, command_line: str, add_unknown_args: bool = False) -> None:
try:
temp_dict = json.loads(command_line)
for k, v in temp_dict.items():
found = False
for arg in self.args:
if arg.name == k or arg.cli_name == k:
arg.value = v
found = True
if not found and add_unknown_args:
if isinstance(v, bool):
self.add_arg(key=k, value=v, type=ParameterType.Boolean)
elif isinstance(v, int) or isinstance(v, float):
self.add_arg(key=k, value=v, type=ParameterType.Number)
elif isinstance(v, dict):
self.add_arg(key=k, value=v, type=ParameterType.ConnectionInfo)
elif isinstance(v, list):
self.add_arg(key=k, value=v, type=ParameterType.Array)
else:
self.add_arg(key=k, value=v, type=ParameterType.String)
except Exception as e:
logger.error(e)
logger.error("Tried parsing command line as JSON when it's not")
return
def load_args_from_dictionary(self, dictionary, add_unknown_args: bool = False) -> None:
for k, v in dictionary.items():
found = False
for arg in self.args:
if arg.name == k or arg.cli_name == k:
arg.value = v
found = True
if not found and add_unknown_args:
if isinstance(v, bool):
self.add_arg(key=k, value=v, type=ParameterType.Boolean)
elif isinstance(v, int) or isinstance(v, float):
self.add_arg(key=k, value=v, type=ParameterType.Number)
elif isinstance(v, dict):
self.add_arg(key=k, value=v, type=ParameterType.ConnectionInfo)
elif isinstance(v, list):
self.add_arg(key=k, value=v, type=ParameterType.Array)
else:
self.add_arg(key=k, value=v, type=ParameterType.String)
def get_parameter_group_name(self) -> str:
if self.parameter_group_name is not None:
return self.parameter_group_name
elif self.manual_args is not None:
return "Custom"
groupNameOptions = []
suppliedArgNames = []
if len(self.args) == 0:
return "Default"
for arg in self.args:
for group_info in arg.parameter_group_info:
if group_info.group_name not in groupNameOptions:
groupNameOptions.append(group_info.group_name)
for arg in self.args:
# when determining the group we're in, only look at arguments that have values that were set by the user
# default values don't count
if arg.value is not None and arg.user_supplied:
suppliedArgNames.append(arg.name)
groupNameIntersection = []
for group_info in arg.parameter_group_info:
if group_info.group_name in groupNameOptions:
groupNameIntersection.append(group_info.group_name)
groupNameOptions = groupNameIntersection
# this gives us the groups that our currently supplied commands belong to, but doesn't account for some groups still needing other parameters
# need to loop through available options and see if we have all of the needed parameters
if len(groupNameOptions) == 0:
raise ValueError(f"Supplied Arguments, {suppliedArgNames}, don't match any parameter group")
elif len(groupNameOptions) > 1:
finalMatchingGroupNames = []
for groupNameOption in groupNameOptions:
has_all_values = True
for arg in self.args:
for group_info in arg.parameter_group_info:
if group_info.group_name == groupNameOption:
if group_info.required and not arg.user_supplied:
# one of our matching parameter groups that we've supplied values for requires this parameter, but the user didn't supply one
# so this parameter group can't be the one we use
has_all_values = False
if has_all_values:
finalMatchingGroupNames.append(groupNameOption)
if len(finalMatchingGroupNames) == 0:
raise ValueError(
f"Supplied Arguments, {suppliedArgNames}, match more than one parameter group, {groupNameOptions}, and all require at least one more value from the user")
elif len(finalMatchingGroupNames) > 1:
if self.initial_parameter_group in finalMatchingGroupNames:
return self.initial_parameter_group
raise ValueError(
f"Supplied Arguments, {suppliedArgNames}, match more than one parameter group, {finalMatchingGroupNames}")
else:
return finalMatchingGroupNames[0]
else:
return groupNameOptions[0]
def get_parameter_group_arguments(self) -> [CommandParameter]:
groupName = self.get_parameter_group_name()
group_arguments = []
for arg in self.args:
matched_arg = False
for group_info in arg.parameter_group_info:
if group_info.group_name == groupName:
matched_arg = True
if matched_arg:
group_arguments.append(arg)
return group_arguments
async def verify_required_args_have_values(self) -> bool:
# first we have to establish which parameter group we're in
if self.manual_args is not None:
return True
groupName = self.get_parameter_group_name()
for arg in self.args:
matched_arg = False
arg_required = False
for group_info in arg.parameter_group_info:
if group_info.group_name == groupName:
matched_arg = True
arg_required = group_info.required
if matched_arg:
if arg.value is None:
arg.value = arg.default_value
if arg_required and arg.value is None:
raise ValueError("Required arg {} has no value".format(arg.name))
return True
async def get_unused_args(self) -> str:
if len(self.args) > 0:
caughtException = ""
try:
if self.manual_args is not None:
groupName = ""
else:
groupName = self.get_parameter_group_name()
except Exception as e:
logger.error(f"Failed to get group name for tasking: {e}\n")
caughtException = f"Failed to get group name for tasking: {e}\n"
groupName = "N/A"
temp = {}
for arg in self.args:
matched_arg = False
for group_info in arg.parameter_group_info:
if group_info.group_name == groupName:
matched_arg = True
if not matched_arg:
if isinstance(arg.value, bytes):
temp[arg.name] = base64.b64encode(arg.value).decode()
else:
temp[arg.name] = arg.value
return f"The following args aren't being used because they don't belong to the {groupName} parameter group: \n{json.dumps(temp, indent=2)}\n{caughtException}"
else:
return ""
def __str__(self) -> str:
if self.manual_args is not None:
if isinstance(self.manual_args, dict):
return json.dumps(self.manual_args)
else:
return str(self.manual_args)
if len(self.args) > 0:
try:
groupName = self.get_parameter_group_name()
except Exception as e:
return self.command_line
temp = {}
for arg in self.args:
matched_arg = False
for group_info in arg.parameter_group_info:
if group_info.group_name == groupName:
matched_arg = True
if matched_arg:
if isinstance(arg.value, bytes):
temp[arg.name] = base64.b64encode(arg.value).decode()
elif arg.value is not None:
temp[arg.name] = arg.value
else:
logger.debug(f"Argument {arg.name} has a Null value, not adding it to JSON")
return json.dumps(temp)
else:
return self.command_line
@abstractmethod
async def parse_arguments(self) -> None:
pass
class Callback:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
class BrowserScript:
"""Location and author of browser script code for a command's output
Attributes:
script_name (str): The name of the javascript file
author (str): The name (or handle) of the author of the script
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self, script_name: str, author: str = None, for_new_ui: bool = False):
self.script_name = script_name
self.author = author
def to_json(self, base_path: Path):
try:
code_file = (
base_path
/ "{}.js".format(self.script_name)
)
if code_file.exists():
code = code_file.read_bytes().decode()
# code = base64.b64encode(code).decode()
return {"script": code, "name": self.script_name, "author": self.author}
elif Path(self.script_name).exists():
code = Path(self.script_name).read_bytes().decode()
# code = base64.b64encode(code).decode()
return {"script": code, "name": self.script_name, "author": self.author}
else:
raise Exception(
"Code for Browser Script, " + self.script_name + ", does not exist on disk at path: " + str(
code_file))
except Exception as e:
raise e
class MythicTask:
"""Instance of Mythic Tasking used with `create_tasking`. Use `create_go_tasking` instead.
Deprecated
"""
def __init__(
self,
taskinfo: dict,
args: TaskArguments,
callback_info: dict
):
self.id = taskinfo["id"]
self.original_params = taskinfo["original_params"]
self.completed = taskinfo["completed"]
self.callback = Callback(**callback_info)
self.agent_task_id = taskinfo["agent_task_id"]
self.token = taskinfo["token_id"]
if self.token is not None and self.token <= 0:
self.token = None
self.operator = taskinfo["operator_username"]
self.opsec_pre_blocked = taskinfo["opsec_pre_blocked"]
self.opsec_pre_message = taskinfo["opsec_pre_message"]
self.opsec_pre_bypassed = taskinfo["opsec_pre_bypassed"]
self.opsec_pre_bypass_role = taskinfo["opsec_pre_bypass_role"]
self.opsec_post_blocked = taskinfo["opsec_post_blocked"]
self.opsec_post_message = taskinfo["opsec_post_message"]
self.opsec_post_bypassed = taskinfo["opsec_post_bypassed"]
self.opsec_post_bypass_role = taskinfo["opsec_post_bypass_role"]
self.display_params = taskinfo["display_params"]
self.command_name = taskinfo["command_name"]
self.args = args
self.manual_args = None
self.status = MythicStatus.Preprocessing
if 'status' in taskinfo and taskinfo['status'] is not None:
self.status = MythicStatus(taskinfo['status'])
self.tasking_location = taskinfo["tasking_location"] if "tasking_location" in taskinfo else "command_line"
self.stdout = taskinfo["stdout"] if "stdout" in taskinfo else ""
self.stderr = taskinfo["stderr"] if "stderr" in taskinfo else ""
self.subtask_callback_function = taskinfo["subtask_callback_function"]
self.group_callback_function = taskinfo["group_callback_function"]
self.completed_callback_function = taskinfo["completed_callback_function"]
self.subtask_group_name = taskinfo["subtask_group_name"]
self.parameter_group_name = taskinfo["parameter_group_name"]
# self.tags is an array of tags to associate with the task
def get_status(self) -> MythicStatus:
return self.status
def set_status(self, status: MythicStatus):
self.status = status
def set_stdout(self, stdout: str):
self.stdout = stdout
def set_stderr(self, stderr: str):
self.stderr = stderr
# if you call override_args with your own values, then we won't use the standard JSON string from self.args
# this combined with command_name can allow you to completely set what gets sent to your agent
def override_args(self, args: str):
self.manual_args = args
def __str__(self):
return json.dumps(self.to_json())
def to_json(self):
subtask_callback_function = self.subtask_callback_function
if callable(subtask_callback_function):
subtask_callback_function = subtask_callback_function.__name__
group_callback_function = self.group_callback_function
if callable(group_callback_function):
group_callback_function = group_callback_function.__name__
completed_callback_function = self.completed_callback_function
if callable(completed_callback_function):
completed_callback_function = completed_callback_function.__name__
command_args = str(self.args)
if self.manual_args is not None:
command_args = self.manual_args
return {
"args": command_args,
"stdout": self.stdout,
"stderr": self.stderr,
"opsec_pre_blocked": self.opsec_pre_blocked,
"opsec_pre_message": self.opsec_pre_message,
"opsec_pre_bypass_role": self.opsec_pre_bypass_role,
"opsec_pre_bypassed": self.opsec_pre_bypassed,
"opsec_post_blocked": self.opsec_post_blocked,
"opsec_post_message": self.opsec_post_message,
"opsec_post_bypass_role": self.opsec_post_bypass_role,
"opsec_post_bypassed": self.opsec_post_bypassed,
"display_params": self.display_params,
"subtask_callback_function": subtask_callback_function,
"group_callback_function": group_callback_function,
"completed_callback_function": completed_callback_function,
"subtask_group_name": self.subtask_group_name,
"command_name": self.command_name,
"parameter_group_name": self.parameter_group_name,
}
class AgentResponse:
def __init__(self, response: any, task: MythicTask):
self.response = response
self.task = task
class PTTTaskOPSECPreTaskMessageResponse:
"""Result of running an OPSEC check against a task's information before the create_go_tasking function is called
Attributes:
TaskID (int): The task this response is referring to
Success (str): Did the check happen successfully or not
Error (str): If the check failed to run, this Error provides the message as to why
OpsecPreBlocked (bool): Is this task blocked from running or not?
OpsecPreMessage (str): What information do you want to present back to the user as to why it was blocked or not?
OpsecPreBypassed (bool): Is the block bypassed? You can opt to mark the task as blocked but still allow it through.
OpsecPreBypassRole (str): If the task is blocked, who is allowed to bypass it? Can be 'operator', 'lead', or 'other_operator'.
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self,
TaskID: int = None,
Success: bool = False,
Error: str = "",
OpsecPreBlocked: bool = False,
OpsecPreMessage: str = "",
OpsecPreBypassed: bool = None,
OpsecPreBypassRole: str = "operator",
**kwargs):
self.TaskID = TaskID
self.Success = Success
self.Error = Error
self.OpsecPreBlocked = OpsecPreBlocked
self.OpsecPreMessage = OpsecPreMessage
self.OpsecPreBypassed = OpsecPreBypassed
self.OpsecPreBypassRole = OpsecPreBypassRole
for k, v in kwargs.items():
logger.info(f"unknown kwarg {k} with value {v}")
def to_json(self):
return {
"task_id": self.TaskID,
"success": self.Success,
"error": self.Error,
"opsec_pre_blocked": self.OpsecPreBlocked,
"opsec_pre_message": self.OpsecPreMessage,
"opsec_pre_bypassed": self.OpsecPreBypassed,
"opsec_pre_bypass_role": self.OpsecPreBypassRole
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class PTTTaskOPSECPostTaskMessageResponse:
"""Result of running an OPSEC check against a task's information after the create_go_tasking function is called.
This can be useful in case the create_go_tasking function created additional files that you want to check for OPSEC issues first before allowing an agent to pick up the task.
Attributes:
TaskID (int): The task this response is referring to
Success (str): Did the check happen successfully or not
Error (str): If the check failed to run, this Error provides the message as to why
OpsecPostBlocked (bool): Is this task blocked from running or not?
OpsecPostMessage (str): What information do you want to present back to the user as to why it was blocked or not?
OpsecPostBypassed (bool): Is the block bypassed? You can opt to mark the task as blocked but still allow it through.
OpsecPostBypassRole (str): If the task is blocked, who is allowed to bypass it? Can be 'operator', 'lead', or 'other_operator'.
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self,
TaskID: int = None,
Success: bool = False,
Error: str = "",
OpsecPostBlocked: bool = False,
OpsecPostMessage: str = "",
OpsecPostBypassed: bool = None,
OpsecPostBypassRole: str = "operator",
**kwargs):
self.TaskID = TaskID
self.Success = Success
self.Error = Error
self.OpsecPostBlocked = OpsecPostBlocked
self.OpsecPostMessage = OpsecPostMessage
self.OpsecPostBypassed = OpsecPostBypassed
self.OpsecPostBypassRole = OpsecPostBypassRole
for k, v in kwargs.items():
logger.info(f"unknown kwarg {k} with value {v}")
def to_json(self):
return {
"task_id": self.TaskID,
"success": self.Success,
"error": self.Error,
"opsec_post_blocked": self.OpsecPostBlocked,
"opsec_post_message": self.OpsecPostMessage,
"opsec_post_bypassed": self.OpsecPostBypassed,
"opsec_post_bypass_role": self.OpsecPostBypassRole
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class PTTaskCreateTaskingMessageResponse:
"""Result of running a command's create_go_tasking function.
Note: If you want to modify the arguments, use taskData.args, not the Params field here.
Attributes:
TaskID (int): The task this response is referring to
Success (str): Did the create_go_tasking function execute properly or was there an error somewhere?
Error (str): If the create_go_tasking failed to run, this Error provides the message as to why
CommandName (str): If you want to change the command name that gets sent down to the agent from the name of the current command, change that here. If you don't want to change it, leave it as None.
TaskStatus (str): If you want to set a specific status (such as 'success', 'completed', or an error status), set that here. If you want things to continue as normal, leave it as None.
DisplayParams (str): If you want to change what the operator sees displayed in the UI to something else, set that here. This is helpful to get rid of JSON and replace it with something easier to parse for people.
Stdout (str): If you want to save off some output from the `create_go_tasking` function, but don't want to display it to the user, this is a great way to save that. An example might be build information if you're compiling additional files.
Stderr (str): If you want to save off some error output from the `create_go_tasking` function, but don't want to display it to the user, this is a great way to save that.
Completed (bool): If you want to mark this task as "completed" in the UI so that the callback doesn't pick it up, set this to True.
TokenID (int): If you want to add/change/remove the token id associated with this task, change this value.
CompletionFunctionName (str): If you want to register a completion function to execute once this task is complete, provide the name here. Your command definition must have a matching dictionary entry with function name to actual function to call.
ParameterGroupName (str): If you want to override the automatic detection of the parameter group after the creat_go_tasking function executes, set the name here.
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self,
TaskID: int = None,
Success: bool = True,
Error: str = "",
CommandName: str = None,
TaskStatus: str = None,
DisplayParams: str = None,
Stdout: str = None,
Stderr: str = None,
Completed: bool = None,
TokenID: int = None,
CompletionFunctionName: str = None,
Params: str = None,
ParameterGroupName: str = None,
**kwargs):
self.TaskID = TaskID
self.Success = Success
self.Error = Error
self.CommandName = CommandName
self.TaskStatus = TaskStatus
self.DisplayParams = DisplayParams
self.Stdout = Stdout
self.Stderr = Stderr
self.Completed = Completed
self.TokenID = TokenID
if self.TokenID is not None and self.TokenID <= 0:
self.TokenID = None
self.CompletionFunctionName = CompletionFunctionName
self.Params = Params
self.ParameterGroupName = ParameterGroupName
for k, v in kwargs.items():
logger.info(f"unknown kwarg {k} with value {v}")
def to_json(self):
return {
"task_id": self.TaskID,
"success": self.Success,
"error": self.Error,
"command_name": self.CommandName,
"task_status": self.TaskStatus,
"display_params": self.DisplayParams,
"stdout": self.Stdout,
"stderr": self.Stderr,
"completed": self.Completed,
"token_id": self.TokenID,
"completion_function_name": self.CompletionFunctionName,
"params": self.Params,
"parameter_group_name": self.ParameterGroupName
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
InteractiveMessageType = {
0: ("Input", 0),
1: ("Output", 1),
2: ("Error", 2),
3: ("Exit", 3),
4: ("^[", 0x1B),
5: ("^A", 0x01),
6: ("^B", 0x02),
7: ("^C", 0x03),
8: ("^D", 0x04),
9: ("^E", 0x05),
10: ("^F", 0x06),
11: ("^G", 0x07),
12: ("^H", 0x08),
13: ("^I", 0x09),
14: ("^K", 0x0B),
15: ("^L", 0x0C),
16: ("^N", 0x0E),
17: ("^P", 0x10),
18: ("^Q", 0x11),
19: ("^R", 0x12),
20: ("^S", 0x13),
21: ("^U", 0x15),
22: ("^W", 0x17),
23: ("^Y", 0x19),
24: ("^Z", 0x1A)
}
class PTTaskMessageTaskData:
"""A container for all information about a task.
Note: Lowercase names are used in the __init__ function to auto-populate from JSON, but attributes are upper case.
Attributes:
ID (int): The unique ID of the task within Mythic, this is used for various RPC calls.
DisplayID (int): The numerically increasing ID of a task that's shown to the user in the Mythic UI.
AgentTaskID (str): The UUID of a task that's sent down to a callback.
CommandName (str): The name of the command to execute within the callback.
Params (str): A string representation of the parameters that is sent down to the callback.
Timestamp (str): A string representation of the last time something about the task changed
CallbackID (int): The unique ID of the callback for this task
Status (str): The current status of this task (likely to be preprocessing in create_go_tasking)
OriginalParams (str): The original parameters that the user supplied (after processing by the Mythic UI)
DisplayParams (str): The modified parameters if any were set to be easier for operators to read (defaults to the same as OriginalParams)
Comment (str): The comment on the task if one exists
Stdout (str): Additional stdout for the task if any was set as part of the create_go_tasking function
Stderr (str): Additional stderr for the task if any was set as part of the create_go_tasking function
Completed (bool): Indicating if this task is completed or not
OperatorUsername (str): The name of the operator that issued this task
OpsecPreBlocked (bool): If this task was originally blocked in the opsec pre check
OpsecPreMessage (str): A message (if any) provided as part of the opsec pre check
OpsecPreBypassed (bool): If this task's opsec block was bypassed
OpsecPreBypassRole (str): Who is able to bypass this opsec block
OpsecPostBlocked (bool): If the task was blocked in the opsec post check
OpsecPostMessage (str): A message (if any) provided as part of the ospec post check
OpsecPostBypassed (bool): If this task's opsec block was bypassed
OpsecPostBypassRole (str): Who is able to bypass this opsec block
ParentTaskID (int): If this is a subtask of some other task, this is the ID field for that task
SubtaskCallbackFunction (str): If this is a subtask and the parent task wants a specific callback function to be executed when this task finishes, the name of that function is here
SubtaskCallbackFunctionCompleted (bool): Indication of the completion status of that subtask callback function
GroupCallbackFunction (str): If this is a subtask and part of a group and the parent task wants a specific callback function to be execution when this group of tasks finishes, the name of that function is here
GroupCallbackFunctionCompleted (bool): Indication of the completion status of that subtask group callback function
CompletedCallbackFunction (str): If THIS task wants a specific function to execute when it finishes, the name of that function will be here
CompletedCallbackFunctionCompleted (bool): Indication of the completion status of that callback function
SubtaskGroupName (str): If this is a subtask and part of a named group, the name of the group will be here
TaskingLocation (str): The location from where this tasking came (modal, parsed cli, browser script, etc)
ParameterGroupName (str): The name of the parameter group for this task's command parameters
TokenID (int): The identifier for the token associated with this task (if one was specified)
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self,
id: int = 0,
display_id: int = 0,
agent_task_id: str = "",
command_name: str = "",
params: str = "",
timestamp: str = "",
callback_id: int = 0,
callback_display_id: int = 0,
payload_type: str = "",
operator_id: int = 0,
status: str = "",
original_params: str = "",
display_params: str = "",
comment: str = "",
stdout: str = "",
stderr: str = "",
completed: bool = False,
operator_username: str = "",
opsec_pre_blocked: bool = False,
opsec_pre_message: str = "",
opsec_pre_bypassed: bool = False,
opsec_pre_bypass_role: str = "",
opsec_post_blocked: bool = False,
opsec_post_message: str = "",
opsec_post_bypassed: bool = False,
opsec_post_bypass_role: str = "",
parent_task_id: int = 0,
subtask_callback_function: str = "",
subtask_callback_function_completed: bool = False,
group_callback_function: str = "",
group_callback_function_completed: bool = False,
completed_callback_function: str = "",
completed_callback_function_completed: bool = False,
subtask_group_name: str = "",
tasking_location: str = "",
parameter_group_name: str = "",
token_id: int = None,
response_count: int = None,
is_interactive_task: bool = None,
interactive_task_type: int = None,
**kwargs):
self.ID = id
self.DisplayID = display_id
self.AgentTaskID = agent_task_id
self.CommandName = command_name
self.Params = params
self.Timestamp = timestamp
self.CallbackID = callback_id
self.CallbackDisplayID = callback_display_id
self.PayloadType = payload_type
self.Status = status
self.OriginalParams = original_params
self.DisplayParams = display_params
self.Comment = comment
self.Stdout = stdout
self.Stderr = stderr
self.Completed = completed
self.OperatorUsername = operator_username
self.OperatorID = operator_id
self.OpsecPreBlocked = opsec_pre_blocked
self.OpsecPreMessage = opsec_pre_message
self.OpsecPreBypassed = opsec_pre_bypassed
self.OpsecPreBypassRole = opsec_pre_bypass_role
self.OpsecPostBlocked = opsec_post_blocked
self.OpsecPostMessage = opsec_post_message
self.OpsecPostBypassed = opsec_post_bypassed
self.OpsecPostBypassRole = opsec_post_bypass_role
self.ParentTaskID = parent_task_id
self.SubtaskCallbackFunction = subtask_callback_function
self.SubtaskCallbackFunctionCompleted = subtask_callback_function_completed
self.GroupCallbackFunction = group_callback_function
self.GroupCallbackFunctionCompleted = group_callback_function_completed
self.CompletedCallbackFunction = completed_callback_function
self.CompletedCallbackFunctionCompleted = completed_callback_function_completed
self.SubtaskGroupName = subtask_group_name
self.TaskingLocation = tasking_location
self.ParameterGroupName = parameter_group_name
self.TokenID = token_id
if self.TokenID is not None and self.TokenID <= 0:
self.TokenID = None
self.ResponseCount = response_count
self.IsInteractiveTask = is_interactive_task
self.InteractiveTaskType = interactive_task_type
def to_json(self):
return {
"id": self.ID,
"display_id": self.DisplayID,
"agent_task_id": self.AgentTaskID,
"command_name": self.CommandName,
"params": self.Params,
"timestamp": self.Timestamp,
"callback_id": self.CallbackID,
"callback_display_id": self.CallbackDisplayID,
"payload_type": self.PayloadType,
"operator_id": self.OperatorID,
"status": self.Status,
"original_params": self.OriginalParams,
"display_params": self.DisplayParams,
"comment": self.Comment,
"stdout": self.Stdout,
"stderr": self.Stderr,
"completed": self.Completed,
"operator_username": self.OperatorUsername,
"opsec_pre_blocked": self.OpsecPreBlocked,
"opsec_pre_message": self.OpsecPreMessage,
"opsec_pre_bypass_role": self.OpsecPreBypassRole,
"opsec_pre_bypassed": self.OpsecPreBypassed,
"opsec_post_blocked": self.OpsecPostBlocked,
"opsec_post_message": self.OpsecPostMessage,
"opsec_post_bypass_role": self.OpsecPostBypassRole,
"opsec_post_bypassed": self.OpsecPostBypassed,
"parent_task_id": self.ParentTaskID,
"subtask_callback_function": self.SubtaskCallbackFunction,
"subtask_callback_function_completed": self.SubtaskCallbackFunctionCompleted,
"group_callback_function": self.GroupCallbackFunction,
"group_callback_function_completed": self.GroupCallbackFunctionCompleted,
"completed_callback_function": self.CompletedCallbackFunction,
"completed_callback_function_completed": self.CompletedCallbackFunctionCompleted,
"subtask_group_name": self.SubtaskGroupName,
"tasking_location": self.TaskingLocation,
"parameter_group_name": self.ParameterGroupName,
"token_id": self.TokenID,
"response_count": self.ResponseCount,
"is_interactive_task": self.IsInteractiveTask,
"interactive_task_type": self.InteractiveTaskType
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class PTTaskMessageCallbackData:
"""A container for all information about a callback.
Note: Lowercase names are used in the __init__ function to auto-populate from JSON, but attributes are upper case.
Attributes:
ID (int): The unique ID of the callback within Mythic, this is used for various RPC calls.
DisplayID (int): The numerically increasing ID of a callback that's shown to the user in the Mythic UI.
AgentCallbackID (str): The UUID of a callback that's sent down to a callback.
InitCallback (str): The time of the initial callback
LastCheckin (str): The time of the last time the callback checked in
User (str): The user associated with the callback
Host (str): The hostname for the callback (always in all caps)
PID (int): The PID of the callback
IP (str): The string representation of the IP array for the callback
IPs (list[str]): An array of the IPs for the callback
ExternalIp (str): The external IP address (if identified) for the callback
ProcessName (str): The name of the process for the callback
Description (str): The description for the callback (by default it matches the description for the associated payload)
OperatorID (int): The ID of the operator that created the associated payload
OperatorUsername (string): The username of the operator that created the associated payload
Active (bool): Indicating if this callback is in the active callbacks table or not
IntegrityLevel (int): The integrity level for the callback that mirrors that of Windows (0-4) with a value of 3+ indicating a High Integrity (or root) callback
Locked (bool): Indicating if this callback is locked or not so that other operators can't task it
OperationID (int): The ID of the operation this callback belongs to
OperationName (string): The name of the operation this callback belongs to
CryptoType (str): The type of cryptography used for this callback (typically None or aes256_hmac)
DecKey (bytes): The decryption key for this callback
EncKey (bytes): The encryption key for this callback
OS (str): The OS information reported back by the callback (not the same as the payload os you selected when building the agent)
Architecture (str): The architecture of the process where this callback is executing
Domain (str): The domain associated with the callback if there is one
ExtraInfo (str): Freeform field of extra data that can be stored and retrieved with a callback
SleepInfo (str): Freeform sleep information that can be stored and retrieved as part of a callback (this isn't pre-populated, the agent or command files must set it)
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self,
id: int = 0,
display_id: int = 0,
agent_callback_id: str = "",
init_callback: str = "",
last_checkin: str = "",
user: str = "",
host: str = "",
pid: int = 0,
ip: str = "",
ips: list[str] = [],
external_ip: str = "",
process_name: str = "",
description: str = "",
operator_id: int = 0,
operator_username: str = "",
active: bool = False,
integrity_level: int = 0,
locked: bool = False,
operation_id: int = 0,
operation_name: str = "",
crypto_type: str = "",
os: str = "",
architecture: str = "",
domain: str = "",
extra_info: str = "",
sleep_info: str = "",
dec_key: str = None,
enc_key: str = None,
registered_payload_id: int = 0,
**kwargs):
self.ID = id
self.DisplayID = display_id
self.AgentCallbackID = agent_callback_id
self.InitCallback = init_callback
self.LastCheckin = last_checkin
self.User = user
self.Host = host
self.PID = pid
self.IP = ip
self.IPs = ips
self.ExternalIp = external_ip
self.ProcessName = process_name
self.Description = description
self.OperatorID = operator_id
self.OperatorUsername = operator_username
self.Active = active
self.IntegrityLevel = integrity_level
self.Locked = locked
self.OperationID = operation_id
self.OperationName = operation_name
self.CryptoType = crypto_type
self.DecKey = base64.b64decode(dec_key) if dec_key is not None else None
self.EncKey = base64.b64decode(enc_key) if enc_key is not None else None
self.OS = os
self.Architecture = architecture
self.Domain = domain
self.ExtraInfo = extra_info
self.SleepInfo = sleep_info
self.RegisteredPayloadID = registered_payload_id
for k, v in kwargs.items():
logger.info(f"unknown kwarg {k} with value {v}")
def to_json(self):
return {
"id": self.ID,
"display_id": self.DisplayID,
"agent_callback_id": self.AgentCallbackID,
"init_callback": self.InitCallback,
"last_checkin": self.LastCheckin,
"user": self.User,
"host": self.Host,
"pid": self.PID,
"ip": self.IP,
"ips": self.IPs,
"external_ip": self.ExternalIp,
"process_name": self.ProcessName,
"description": self.Description,
"operator_id": self.OperatorID,
"operator_username": self.OperatorUsername,
"active": self.Active,
"integrity_level": self.IntegrityLevel,
"locked": self.Locked,
"operation_id": self.OperationID,
"operation_name": self.OperationName,
"crypto_type": self.CryptoType,
"dec_key": self.DecKey,
"enc_key": self.EncKey,
"os": self.OS,
"architecture": self.Architecture,
"domain": self.Domain,
"extra_info": self.ExtraInfo,
"sleep_info": self.SleepInfo
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class PTTaskMessagePayloadData:
"""A container for basic information about the payload associated with a task.
Note: Lowercase names are used in the __init__ function to auto-populate from JSON, but attributes are upper case.
Attributes:
OS (str): The operating system selected as the first step in generating this payload
UUID (str): The UUID for the payload
PayloadType (str): The name of the payload type associated with this payload
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self,
os: str = "",
uuid: str = "",
payload_type: str = "",
**kwargs):
self.OS = os
self.UUID = uuid
self.PayloadType = payload_type
for k, v in kwargs.items():
logger.info(f"unknown kwarg {k} with value {v}")
def to_json(self):
return {
"os": self.OS,
"uuid": self.UUID,
"payload_type": self.PayloadType
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class PTTaskMessageAllData:
"""A container for all information about a Task including the task, callback, build parameters, commands, etc.
Note: Lowercase names are used in the __init__ function to auto-populate from JSON, but attributes are upper case.
Attributes:
Task (PTTaskMessageTaskData): The information about this task
Callback (PTTaskMessageCallbackData): The information about this task's callback
Payload (PTTaskMessagePayloadData): The information about this task's associated payload
Commands (list[str]): The names of all the commands currently loaded into this callback
PayloadType (str): The name of the payload type associated with this callback
CommandPayloadType (str): Name of the payload type associated with this task
BuildParameters (list[MythicRPCPayloadConfigurationBuildParameter]): Information about the build parameters used to generate the payload for this callback
C2Profiles (list[MythicRPCPayloadConfigurationC2Profile]): Information about the c2 profiles associated with this callback and their values
args: The running instance of arguments for this task, this allows you to modify any arguments as necessary in your `create_go_tasking` function
Secrets (dict): Dictionary of secrets associated with the user for this action
Functions:
to_json(self): return dictionary form of class
"""
args: TaskArguments
def __init__(self,
task: dict = {},
callback: dict = {},
build_parameters: list[dict] = [],
commands: list[str] = [],
payload: dict = {},
c2info: list[dict] = [],
payload_type: str = "",
command_payload_type: str = "",
args: TaskArguments.__class__ = None,
secrets: dict = {},
**kwargs):
self.Task = PTTaskMessageTaskData(**task)
self.Callback = PTTaskMessageCallbackData(**callback)
self.Payload = PTTaskMessagePayloadData(**payload)
self.Commands = commands
self.PayloadType = payload_type
self.CommandPayloadType = command_payload_type
self.BuildParameters = [MythicRPCPayloadConfigurationBuildParameter(**x) for x in build_parameters]
self.C2Profiles = [MythicRPCPayloadConfigurationC2Profile(**x) for x in c2info]
self.Secrets = secrets
if args is not None:
self.args = args(command_line=task["params"],
tasking_location=task["tasking_location"],
raw_command_line=task["original_params"],
task_dictionary=task,
initial_parameter_group=task["parameter_group_name"])
else:
self.args = args
for k, v in kwargs.items():
logger.info(f"unknown kwarg {k} with value {v}")
def to_json(self):
return {
"task": self.Task.to_json(),
"callback": self.Callback.to_json(),
"build_parameters": [x.to_json() for x in self.BuildParameters],
"commands": self.Commands,
"payload": self.Payload.to_json(),
"c2info": [x.to_json() for x in self.C2Profiles],
"payload_type": self.PayloadType,
"command_payload_type": self.CommandPayloadType,
"secrets": self.Secrets
}
def set_args(self, args: TaskArguments.__class__) -> None:
self.args = args(command_line=self.Task.Params,
tasking_location=self.Task.TaskingLocation,
raw_command_line=self.Task.OriginalParams, )
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class PTCallbacksToCheck:
def __init__(self, id: int = 0,
display_id: int = 0,
agent_callback_id: str = "",
initial_checkin: str = "",
last_checkin: str = "",
sleep_info: str = "",
active_c2_profiles: [str] = []):
self.ID = id
self.DisplayID = display_id
self.AgentCallbackID = agent_callback_id
self.InitialCheckin = initial_checkin
self.LastCheckin = last_checkin
self.SleepInfo = sleep_info
self.ActiveC2Profiles = active_c2_profiles
def to_json(self):
return {
"id": self.ID,
"display_id": self.DisplayID,
"agent_callback_id": self.AgentCallbackID,
"sleep_info": self.SleepInfo,
"initial_checkin": self.InitialCheckin,
"last_checkin": self.LastCheckin,
"active_c2_profiles": self.ActiveC2Profiles
}
class PTCheckIfCallbacksAliveMessage:
def __init__(self,
container_name: str = "",
callbacks: [dict] = []):
self.ContainerName = container_name
self.Callbacks = [PTCallbacksToCheck(**x) for x in callbacks]
def to_json(self):
return {
"container_name": self.ContainerName,
"callbacks": [x.to_json() for x in self.Callbacks]
}
class PTCallbacksToCheckResponse:
def __init__(self, ID: int = 0, Alive: bool = True):
self.ID = ID
self.Alive = Alive
def to_json(self):
return {
"id": self.ID,
"alive": self.Alive
}
class PTCheckIfCallbacksAliveMessageResponse:
def __init__(self, Success: bool = True, Error: str = "",
Callbacks: [PTCallbacksToCheckResponse] = []):
self.Success = Success
self.Error = Error
self.Callbacks = Callbacks
def to_json(self):
return {
"success": self.Success,
"error": self.Error,
"callbacks": [x.to_json() for x in self.Callbacks]
}
class PTOnNewCallbackAllData:
"""A container for all information about a callback including the callback, build parameters, commands, etc.
Note: Lowercase names are used in the __init__ function to auto-populate from JSON, but attributes are upper case.
Attributes:
Callback (PTTaskMessageCallbackData): The information about this callback
Payload (PTTaskMessagePayloadData): The information about this callback's associated payload
Commands (list[str]): The names of all the commands currently loaded into this callback
PayloadType (str): The name of the payload type
BuildParameters (list[MythicRPCPayloadConfigurationBuildParameter]): Information about the build parameters used to generate the payload for this callback
C2Profiles (list[MythicRPCPayloadConfigurationC2Profile]): Information about the c2 profiles associated with this callback and their values
Secrets (dict): Dictionary of secrets from the user associated with this action
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self,
callback: dict = {},
build_parameters: list[dict] = [],
commands: list[str] = [],
payload: dict = {},
c2info: list[dict] = [],
payload_type: str = "",
secrets: dict = {},
**kwargs):
self.Callback = PTTaskMessageCallbackData(**callback)
self.Payload = PTTaskMessagePayloadData(**payload)
self.Commands = commands
self.PayloadType = payload_type
self.BuildParameters = [MythicRPCPayloadConfigurationBuildParameter(**x) for x in build_parameters]
self.C2Profiles = [MythicRPCPayloadConfigurationC2Profile(**x) for x in c2info]
self.Secrets = secrets
for k, v in kwargs.items():
logger.info(f"unknown kwarg {k} with value {v}")
def to_json(self):
return {
"callback": self.Callback.to_json(),
"build_parameters": [x.to_json() for x in self.BuildParameters],
"commands": self.Commands,
"payload": self.Payload.to_json(),
"c2info": [x.to_json() for x in self.C2Profiles],
"payload_type": self.PayloadType,
"secrets": self.Secrets
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class PTOnNewCallbackResponse:
"""The result of executing the on_new_callback function for a payload type
Attributes:
AgentCallbackID (str): The Agent Callback UUID of the new callback
Success (bool): Did the function execute successfully or not
Error (str): If the function failed to execute, return an error message here
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self,
AgentCallbackID: str,
Success: bool = True,
Error: str = ""):
self.AgentCallbackID = AgentCallbackID
self.Success = Success
self.Error = Error
def to_json(self):
return {
"agent_callback_id": self.AgentCallbackID,
"success": self.Success,
"error": self.Error
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class PTTaskCompletionFunctionMessage:
"""A request to execute the completion function for a task or subtask
Note: Lowercase names are used in the __init__ function to auto-populate from JSON, but attributes are upper case.
Attributes:
TaskData (PTTaskMessageAllData): Information about this task and its callback, payload, build params, etc
CompletionFunctionName (str): The name of the completion function to execute
SubtaskGroup (str): If this task is part of a subtask group, this is the name of that group
SubtaskData (PTTaskMessageAllData): If this is a completion function from a subtask, then this is all the information about that subtask, not the current task.
Functions:
to_json(self): return dictionary form of class
"""
SubtaskData: PTTaskMessageAllData
def __init__(self,
args: TaskArguments,
task: dict,
function_name: str,
subtask: dict = None,
subtask_group_name: str = None,
**kwargs):
self.TaskData = PTTaskMessageAllData(**task, args=args)
self.CompletionFunctionName = function_name
self.SubtaskGroup = subtask_group_name
if subtask is not None:
self.SubtaskData = PTTaskMessageAllData(**subtask)
else:
self.SubtaskData = None
for k, v in kwargs.items():
logger.info(f"unknown kwarg {k} with value {v}")
def to_json(self):
return {
"task": self.TaskData.to_json(),
"function_name": self.CompletionFunctionName,
"subtask": self.SubtaskData.to_json() if self.SubtaskData is not None else None,
"subtask_group_name": self.SubtaskGroup
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class PTTaskCompletionFunctionMessageResponse:
"""The result of executing the completion function for a task or subtask
Attributes:
TaskID (int): The ID of the task (auto filled in for you)
ParentTaskId (int): The ID of the parent task (auto filled in for you)
Success (bool): Did the completion function execute successfully or not
Error (str): If the completion function failed to execute, return an error message here
TaskStatus (str): If you want to update the status of the task to something other than default as a result of this execution, set the value here
DisplayParams (str): If you want to update the display parameters of the task, update them here
Stdout (str): If you want to save some output about the completion function without adding it to the normal output, save it here
Stderr (str): If you want to save some stderr about the completion function without adding it to the normal output, save it here
Completed (bool): If you want to mark the task as completed (so that the agent can't pick it up), set this to True. This is really only useful if this is a function executed as the result of a subtask completing, so you can prevent the callback from getting this task.
TokenID (int): If you want to add/update/remove the token id associated with this task, set that value here
CompletionFunctionName (str): If you want to set a completion function for this task, set the name here. Make sure it matches a corresponding entry in your command's definition.
Params (str): The task parameters that were sent down to the callback as part of the tasking
ParameterGroupName (str): The name of the parameter group associated with the arguments that were sent down to the callback
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self,
TaskID: int = 0,
ParentTaskId: int = 0,
Success: bool = True,
Error: str = None,
TaskStatus: str = None,
DisplayParams: str = None,
Stdout: str = None,
Stderr: str = None,
Completed: bool = None,
TokenID: int = None,
CompletionFunctionName: str = None,
Params: str = None,
ParameterGroupName: str = None,
**kwargs):
self.TaskID = TaskID
self.ParentTaskId = ParentTaskId
self.Success = Success
self.Error = Error
self.TaskStatus = TaskStatus
self.DisplayParams = DisplayParams
self.Stdout = Stdout
self.Stderr = Stderr
self.Completed = Completed
self.TokenID = TokenID
if self.TokenID is not None and self.TokenID <= 0:
self.TokenID = None
self.CompletionFunctionName = CompletionFunctionName
self.Params = Params
self.ParameterGroupName = ParameterGroupName
for k, v in kwargs.items():
logger.info(f"unknown kwarg {k} with value {v}")
def to_json(self):
return {
"task_id": self.TaskID,
"parent_task_id": self.ParentTaskId,
"success": self.Success,
"error": self.Error,
"task_status": self.TaskStatus,
"display_params": self.DisplayParams,
"stdout": self.Stdout,
"stderr": self.Stderr,
"completed": self.Completed,
"token_id": self.TokenID,
"completion_function_name": self.CompletionFunctionName,
"params": self.Params,
"parameter_group_name": self.ParameterGroupName
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class PTTaskProcessResponseMessageResponse:
"""The result of executing the process response function for a task
Attributes:
TaskID (int): The ID of the task
Success (bool): Did the function execute successfully or not
Error (str): If the function failed to execute, return an error message here
Functions:
to_json(self): return dictionary form of class
"""
def __init__(self,
TaskID: int,
Success: bool = True,
Error: str = ""):
self.TaskID = TaskID
self.Success = Success
self.Error = Error
def to_json(self):
return {
"task_id": self.TaskID,
"success": self.Success,
"error": self.Error
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
class CommandBase(metaclass=ABCMeta):
"""The base definition for a command that Mythic tracks for a Payload Type
Attributes:
cmd (str): The name of the command
needs_admin (bool): Indicating if the command needs elevated permissions to execute properly
help_cmd (str): A short help string for how to run the command
description (str): Description of what the command does and how it works
version (int): The current version of the command
author (str): The name or handle of the author of the command
attackmapping (list[str]): A list of MITRE ATT&CK Technique IDs (ex: T1033)
supported_ui_features (list[str]): A list of supported ui features (ex: "callback_table:exit")
browser_script (BrowserScript): An instance of BrowserScript if the command provides a browser script to manipluate its output
script_only (bool): Indicate if the command is only a script or if there's backing code within the agent
attributes (CommandAttributes): Additional attributes about the command such as if it's builtin, suggested, or anything else
completion_functions (dict): A list of completion/subtask functions by their name and function value so they can be called later
argument_class (TaskArguments.__class__): The class that's used to parse the future task's arguments into the command's parameters
agent_code_path (Path): The Path to the code for the agent (helpful for building and loading commands dynamically)
agent_browserscript_path (Path): The Path to where browser scripts are located for your payload type
Functions:
to_json:
return dictionary form of class
opsec_pre:
A function for checking for OPSEC issues before the command is executed
opsec_post:
A function for checking for OPSEC issues before a command is executed, but after the create_go_tasking function is executed
create_go_tasking:
The main function for doing additional processing of the task before it's ready for the agent to fetch it
process_response:
Optional additional processing of responses from the agent in any free-form format
"""
supported_ui_features: list[str] = []
browser_script: BrowserScript = None
script_only: bool = False
attributes: CommandAttributes = None
completion_functions: dict[
str, Callable[[PTTaskCompletionFunctionMessage], Awaitable[PTTaskCompletionFunctionMessageResponse]]] = {}
argument_class: TaskArguments.__class__
base_path: Path = Path(".")
agent_code_path: Path = base_path / "agent_code"
agent_browserscript_path: Path = base_path / "browser_scripts"
def __init__(self, agent_path: Path, agent_code_path: Path, agent_browserscript_path: Path):
self.base_path = agent_path
self.agent_code_path = agent_code_path
self.agent_browserscript_path = agent_browserscript_path
@property
@abstractmethod
def cmd(self):
pass
@property
@abstractmethod
def needs_admin(self):
pass
@property
@abstractmethod
def help_cmd(self):
pass
@property
@abstractmethod
def description(self):
pass
@property
@abstractmethod
def version(self):
pass
@property
@abstractmethod
def author(self):
pass
@property
@abstractmethod
def attackmapping(self):
pass
async def opsec_pre(self, taskData: PTTaskMessageAllData) -> PTTTaskOPSECPreTaskMessageResponse:
response = PTTTaskOPSECPreTaskMessageResponse(
TaskID=taskData.Task.ID, Success=True, OpsecPreBlocked=False,
OpsecPreMessage="Not implemented, passing by default",
)
return response
async def opsec_post(self, taskData: PTTaskMessageAllData) -> PTTTaskOPSECPostTaskMessageResponse:
response = PTTTaskOPSECPostTaskMessageResponse(
TaskID=taskData.Task.ID, Success=True, OpsecPostBlocked=False,
OpsecPostMessage="Not implemented, passing by default",
)
return response
async def create_tasking(self, task: MythicTask) -> MythicTask:
return task
async def process_response(self, task: PTTaskMessageAllData, response: any) -> PTTaskProcessResponseMessageResponse:
pass
def to_json(self):
params = self.argument_class("").to_json()
if self.browser_script is not None:
if isinstance(self.browser_script, list):
logger.error(f"{self.cmd}'s browserscript attribute should not be an array, but a single script")
sys.exit(1)
else:
try:
bscript = {"browserscript": self.browser_script.to_json(self.agent_browserscript_path)}
except Exception as e:
logger.error(f"Failed to get browser script for {self.cmd}:\n{e}")
bscript = {}
else:
bscript = {"browser_script": {}}
if self.attributes is None:
attributes = CommandAttributes()
else:
attributes = self.attributes
return {
"name": self.cmd,
"needs_admin_permission": self.needs_admin,
"help_string": self.help_cmd,
"description": self.description,
"version": self.version,
"supported_ui_features": self.supported_ui_features if self.supported_ui_features is not None else [],
"author": self.author,
"attack": self.attackmapping,
"parameters": params,
"attributes": attributes.to_json(),
"script_only": self.script_only if self.script_only is not None else False,
**bscript,
}
def __str__(self):
return json.dumps(self.to_json(), sort_keys=True, indent=2)
commands: dict[str, list[CommandBase]] = {}