Post

Building Your Arsenal: A Deep Dive into Automated Malware Collection

Building Your Arsenal: A Deep Dive into Automated Malware Collection

Building Your Arsenal: A Deep Dive into Automated Malware Collection

In the ever-evolving landscape of cyber threats, staying ahead of malicious actors requires a robust arsenal of tools and knowledge. Today, I’m going to unlock a powerful capability: automated malware sample collection and management. Whether you’re a seasoned threat hunter, a curious security researcher, or an aspiring malware analyst, this guide will equip you with the tools to build your own formidable malware database.


The Power of Automation in Malware Research

In today’s fast-paced threat landscape, manually collecting malware samples is like trying to drink from a firehose—it’s overwhelming and inefficient. Automation isn’t just a convenience; it’s a necessity. This Python script transforms the tedious task of manual malware collection into a streamlined process, allowing you to focus on what really matters: analysis and research.

This post will take you through an in-depth breakdown of a Python script that automates the collection, unzipping, and hashing of malware samples from MalwareBazaar, a well-known platform for gathering the latest malware samples. Let’s dive deep into how this script works and why each block of code is important!


The Complete Arsenal: My Automation Script

Let’s start by examining My complete weapon of choice - the Python script that makes all this possible:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os
import hashlib
import zipfile
import requests
from tqdm import tqdm
from termcolor import colored
from datetime import datetime, timedelta
import time

def download_file(date):
    filename = f"{date}.zip"
    file_url = f"https://datalake.abuse.ch/malware-bazaar/daily/{filename}"
    
    max_retries = 100
    retry_delay = 2  # seconds

    for attempt in range(max_retries):
        try:
            response = requests.head(file_url, timeout=10)
            if response.status_code != 200:
                print(colored(f"Failed to download the file '{filename}'. File may not exist for the specified date.", 'red'))
                return

            total_size = int(response.headers.get('content-length', 0))
            
            response = requests.get(file_url, stream=True, timeout=10)
            with open(filename, 'wb') as file, tqdm(
                desc=colored(f"Downloading {filename}", 'green'),
                total=total_size,
                unit='B',
                unit_scale=True,
                unit_divisor=1024,
                colour='blue',
            ) as bar:
                for data in response.iter_content(chunk_size=4096):
                    file.write(data)
                    bar.update(len(data))
            
            print(colored(f"File '{filename}' downloaded successfully.", 'green'))
            return
        except requests.exceptions.RequestException as e:
            print(colored(f"Error downloading {filename}: {e}", 'red'))
            if attempt < max_retries - 1:
                print(colored(f"Retrying in {retry_delay} seconds...", 'yellow'))
                time.sleep(retry_delay)
            else:
                print(colored(f"Failed to download {filename} after {max_retries} attempts.", 'red'))
                return

def download_files_in_range(start_date, end_date):
    current_date = start_date
    while current_date <= end_date:
        download_file(current_date.strftime("%Y-%m-%d"))
        current_date += timedelta(days=1)

def unzip_files(password="infected"):
    os.makedirs("Downloaded Samples", exist_ok=True)
    for item in os.listdir():
        if item.endswith(".zip"):
            with zipfile.ZipFile(item, 'r') as zip_ref:
                try:
                    zip_ref.extractall("Downloaded Samples", pwd=password.encode())
                    print(colored(f"Unzipped {item} with default password.", 'green'))
                except RuntimeError as e:
                    print(colored(f"Error unzipping {item} with default password: {e}", 'red'))
                    new_password = input(colored("Enter the password for the encrypted ZIP files: ", 'yellow')).strip()
                    try:
                        zip_ref.extractall("Downloaded Samples", pwd=new_password.encode())
                        print(colored(f"Unzipped {item} with user-provided password.", 'green'))
                    except RuntimeError as e:
                        print(colored(f"Error unzipping {item} with user-provided password: {e}", 'red'))

def calculate_hash(file_path, algorithm):
    hash_func = None
    if algorithm == 'md5':
        hash_func = hashlib.md5()
    elif algorithm == 'sha1':
        hash_func = hashlib.sha1()
    elif algorithm == 'sha2':
        hash_func = hashlib.sha256()

    with open(file_path, 'rb') as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_func.update(chunk)
    
    return hash_func.hexdigest()

def calculate_hashes(algorithm):
    hash_file = f"{algorithm}.txt"
    with open(hash_file, 'w') as f:
        for root, _, files in os.walk("Downloaded Samples"):
            for file in files:
                file_path = os.path.join(root, file)
                try:
                    hash_value = calculate_hash(file_path, algorithm)
                    f.write(f"{hash_value}\n")
                except OSError as e:
                    print(colored(f"Error calculating hash for {file_path}: {e}", 'red'))
    print(colored(f"Hashes calculated and saved in {hash_file}", 'green'))

def calculate_all_hashes():
    for algo in ['md5', 'sha1', 'sha2']:
        calculate_hashes(algo)

def main():
    choice = input(colored("Enter 'S' to download a single file or 'M' to download multiple files: ", 'yellow')).strip().upper()

    if choice == 'S':
        date = input(colored("Enter the date in YYYY-MM-DD format: ", 'yellow')).strip()
        download_file(date)
    elif choice == 'M':
        start_date_str = input(colored("Enter the start date in YYYY-MM-DD format: ", 'yellow')).strip()
        end_date_str = input(colored("Enter the end date in YYYY-MM-DD format: ", 'yellow')).strip()
        
        start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
        end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
        
        download_files_in_range(start_date, end_date)
    else:
        print(colored("Invalid choice. Please enter 'S' or 'M'.", 'red'))
        return
    
    unzip_choice = input(colored("Do you want to unzip all downloaded files? (Y/N): ", 'yellow')).strip().upper()
    if unzip_choice == 'Y':
        unzip_files()
        
        hash_choice = input(colored("Do you want to calculate the hash of each file? (Y/N): ", 'yellow')).strip().upper()
        if hash_choice == 'Y':
            hash_algo_choice = input(colored("Enter the hashing algorithm (md5, sha1, sha2, all): ", 'yellow')).strip().lower()
            if hash_algo_choice in ['md5', 'sha1', 'sha2']:
                calculate_hashes(hash_algo_choice)
            elif hash_algo_choice == 'all':
                calculate_all_hashes()
            else:
                print(colored("Invalid hashing algorithm choice.", 'red'))

if __name__ == "__main__":
    main()

Diving Deep: A Code Block Analysis

Let’s dissect each component of our script and understand how this digital specimen collector works its magic. 🔬

From Line 1 to Line 9: Importing Libraries

1
2
3
4
5
6
7
8
import os
import hashlib
import zipfile
import requests
from tqdm import tqdm
from termcolor import colored
from datetime import datetime, timedelta
import time

The first 9 lines of the script are dedicated to importing the libraries that are essential for each function of the malware collection process.

  • Why is this critical? Without these imported modules, we wouldn’t have access to necessary functionality such as making HTTP requests, handling file manipulations, managing file compression, and generating cryptographic hashes for identifying malware samples.

Let’s break down the purpose of each imported library:

  1. os:
    • Provides a way of interacting with the operating system’s file system. We use os to create directories for storing our malware samples, check if files exist, and navigate through directories to locate the ZIP files and their uncompressed contents. For example, creating a folder called “Downloaded Samples” is handled by os.makedirs().
  2. hashlib:
    • This is the core of our hash calculation functionality. In malware analysis, hashes are used to verify that a file has not been altered. Hashes create unique, irreversible outputs from any given input file, making it easy to track malware samples across systems or identify known variants. We use MD5, SHA1, and SHA256—each producing progressively more secure hash values.
  3. zipfile:
    • Used for handling the extraction of compressed files (usually .zip). Malware samples are often distributed in password-protected ZIP files, which allows the researcher to control when and where the sample is exposed to their system. The zipfile module provides the ability to open, extract, and manipulate these files.
  4. requests:
    • This is one of the most important libraries in the script. Requests is a library that allows us to easily send HTTP requests to websites or APIs. We use requests to download the malware samples by communicating with MalwareBazaar’s daily dump server.
  5. tqdm:
    • This library isn’t essential for the functionality of the script, but it greatly enhances the user experience. tqdm is used to show a progress bar while downloading files, giving real-time feedback on how much of the file has been downloaded. Without this, the script would run silently, and the user wouldn’t know the download status until completion.
  6. termcolor:
    • Similar to tqdm, this isn’t necessary for the script to work, but it improves the readability of the output. termcolor allows us to print colored text in the terminal. For example, we print error messages in red and success messages in green to help differentiate the types of messages displayed.
  7. datetime and timedelta:
    • These libraries are used for handling dates and time ranges. Since we are downloading malware samples for specific dates or over a date range, the script needs a way to manipulate dates and iterate over them.
  8. time:
    • The time module is used to introduce delays between retries, making sure we don’t overwhelm the server by sending requests too quickly when a download fails. This is important to avoid getting blocked or causing unnecessary load on the server.

The Download Specialist

From Line 10 to Line 53: The download_file() Function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def download_file(date):
    # Construct the filename and URL for the specified date
    filename = f"{date}.zip"
    file_url = f"https://datalake.abuse.ch/malware-bazaar/daily/{filename}"
    
    # Define the maximum number of retries and the delay between retries
    max_retries = 100
    retry_delay = 2  # seconds

    for attempt in range(max_retries):
        try:
            # Send a request to get the file size
            response = requests.head(file_url, timeout=10)
            if response.status_code != 200:
                print(colored(f"Failed to download the file '{filename}'. File may not exist for the specified date.", 'red'))
                return

            # Get the total file size in bytes
            total_size = int(response.headers.get('content-length', 0))
            
            # Download the file with progress bar
            response = requests.get(file_url, stream=True, timeout=10)
            with open(filename, 'wb') as file, tqdm(
                desc=colored(f"Downloading {filename}", 'green'),
                total=total_size,
                unit='B',
                unit_scale=True,
                unit_divisor=1024,
                colour='blue',
            ) as bar:
                for data in response.iter_content(chunk_size=4096):
                    file.write(data)
                    bar.update(len(data))
            
            print(colored(f"File '{filename}' downloaded successfully.", 'green'))
            return
        except requests.exceptions.RequestException as e:
            print(colored(f"Error downloading {filename}: {e}", 'red'))
            if attempt < max_retries - 1:
                print(colored(f"Retrying in {retry_delay} seconds...", 'yellow'))
                time.sleep(retry_delay)
            else:
                print(colored(f"Failed to download {filename} after {max_retries} attempts.", 'red'))
                return

In this block, the script focuses on downloading a specific file (a ZIP archive) containing malware samples for a given date. Here’s what each part does:

  1. Filename Construction:
    • The filename is constructed dynamically using the input date. The file is named in the format YYYY-MM-DD.zip. This format is consistent with how MalwareBazaar stores its daily dumps, ensuring that our script can easily match the requested date with the available file.
  2. Dynamic URL Generation:
    • The script appends the constructed filename to a base URL. The base URL points to MalwareBazaar’s daily repository, and the full URL specifies the file for a particular day.
  3. Retry Mechanism:
    • This part of the script allows the user to retry a failed download multiple times (up to 100 attempts by default). The retry logic is built to ensure that even if there’s a temporary issue—like a server hiccup or a bad network connection—the script will keep trying to download the file after waiting for 2 seconds between each attempt.
  4. File Size Check and Progress Bar:
    • Before downloading the file, the script sends a HEAD request to check if the file exists. This method doesn’t download the content of the file; instead, it only retrieves the headers, including the file size. Knowing the size of the file allows us to display an accurate progress bar using tqdm.

Why this is important: By knowing the file size, we can track the progress of the download, giving users confidence that the script is working as expected. Without the progress bar, users might think the script has stalled during large downloads.


The Date Range Commander

From Line 55 to Line 59: Downloading Multiple Files with download_files_in_range()

1
2
3
4
5
def download_files_in_range(start_date, end_date):
    current_date = start_date
    while current_date <= end_date:
        download_file(current_date.strftime("%Y-%m-%d"))
        current_date += timedelta(days=1)

This part of the script deals with automating the download of malware samples over a specified date range. It is crucial for researchers who are looking to collect malware samples over an extended period, such as a month or several months.

  1. Date Iteration:
    • The script starts with the start_date and ends at end_date. Using the timedelta library, the script increments the date by 1 day after each iteration until it reaches the end_date.
  2. Calling the Download Function:
    • For each date, the script calls the download_file() function, which downloads the sample for that day. This way, the process is entirely automated. Instead of manually downloading each file, the script handles the workload for the researcher.

Why this is important: In malware research, analyzing patterns and trends is critical. Collecting malware over a longer period allows researchers to track malware evolution, identify new variants, and build comprehensive datasets for training machine learning models or for behavioral analysis.


The Extraction Specialist

From Line 61 to Line 76: Unzipping Password-Protected Files with unzip_files()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def unzip_files(password="infected"):
    os.makedirs("Downloaded Samples", exist_ok=True)
    for item in os.listdir():
        if item.endswith(".zip"):
            with zipfile.ZipFile(item, 'r') as zip_ref:
                try:
                    zip_ref.extractall("Downloaded Samples", pwd=password.encode())
                    print(colored(f"Unzipped {item} with default password.", 'green'))
                except RuntimeError as e:
                    print(colored(f"Error unzipping {item} with default password: {e}", 'red'))
                    new_password = input(colored("Enter the password for the encrypted ZIP files: ", 'yellow')).strip()
                    try:
                        zip_ref.extractall("Downloaded Samples", pwd=new_password.encode())
                        print(colored(f"Unzipped {item} with user-provided password.", 'green'))
                    except RuntimeError as e:
                        print(colored(f"Error unzipping {item} with user-provided password: {e}", 'red'))

Once the files are downloaded, they need to be extracted before analysis. Since these files are usually password-protected for security reasons, the script includes functionality to handle this automatically.

  1. Directory Creation:
    • Before extracting the files, the script checks if a folder called Downloaded Samples exists. If not, it creates the directory. This helps keep the files organized and ensures that the samples don’t clutter the working directory.
  2. Password Handling:
    • Malware files from MalwareBazaar are typically compressed and protected with the default password "infected". The script automatically tries this password to extract the files.
  3. Error Handling:
    • In cases where the default password doesn’t work (for example, if the file uses a different password), the script catches the error and prompts the user to manually enter the correct password. This allows for flexibility in case the protection mechanism varies between files.
  4. ZIP File Processing:
    • Each file in the directory is processed if it ends with .zip. This ensures that only ZIP archives are unzipped, avoiding any accidental attempts to unzip other file types.

Why this is important: Extracting the malware samples safely is a critical step in the analysis process. Automating this ensures that the user can quickly access the contents of the ZIP files while maintaining security by handling password-protected files.


The Cryptographic Laboratory

From Line 78 to Line 91: Calculating File Hashes with calculate_hash()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def calculate_hash(file_path, algorithm):
    hash_func = None
    if algorithm == 'md5':
        hash_func = hashlib.md5()
    elif algorithm == 'sha1':
        hash_func = hashlib.sha1()
    elif algorithm == 'sha2':
        hash_func = hashlib.sha256()

    with open(file_path, 'rb') as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_func.update(chunk)
    
    return hash_func.hexdigest()

This block handles generating cryptographic hashes for the malware samples. Hashing is one of the most important steps in malware analysis because it generates a unique fingerprint for each file, allowing analysts to identify malware variants, track samples, and compare files with known datasets.

  1. Choosing the Algorithm:
    • The calculate_hash() function allows the user to choose from three algorithms—MD5, SHA1, and SHA256. Each of these is a hash function that produces a unique value for a given file, but with different levels of security.

    • MD5: Produces a 128-bit hash value. While fast and commonly used, MD5 is not recommended for security purposes due to its vulnerability to collision attacks (i.e., two different inputs producing the same hash).

    • SHA1: Produces a 160-bit hash value. Although more secure than MD5, SHA1 has also been deprecated for security purposes in many contexts, as it too can be vulnerable to collision attacks.

    • SHA256: Produces a 256-bit hash value and is part of the SHA-2 family. It’s much more secure and is still widely used in modern cryptographic applications.

  2. Reading the File in Chunks:
    • The file is read in chunks of 4KB. This ensures that even very large files can be processed without overloading the system’s memory. Instead of reading the entire file at once, the function processes it in smaller, more manageable pieces, allowing the hash to be updated incrementally.
  3. Hash Function Updating:
    • As each chunk of the file is read, the hash function is updated with the chunk’s data. Once the entire file has been processed, the hash function returns the final hash value for the file.

Why this is important: Hashing is a fundamental part of malware analysis. By generating these unique fingerprints for malware samples, analysts can track malware across different environments and quickly compare files to detect duplicates or previously known threats.


The Hash Calculation Factory

From Line 140 to Line 160: Calculating Hashes for All Files with calculate_hashes()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def calculate_hashes(algorithm):
    hash_file = f"{algorithm}.txt"
    with open(hash_file, 'w') as f:
        for root, _, files in os.walk("Downloaded Samples"):
            for file in files:
                file_path = os.path.join(root, file)
                try:
                    hash_value = calculate_hash(file_path, algorithm)
                    f.write(f"{hash_value}\n")  # Only write the hash value, not the file name
                except OSError as e:
                    print(colored(f"Error calculating hash for {file_path}: {e}", 'red'))
    print(colored(f"Hashes calculated and saved in {hash_file}", 'green'))

def calculate_all_hashes():
    for algo in ['md5', 'sha1', 'sha2']:
        calculate_hashes(algo)

This block is responsible for generating hashes for all the malware samples in the Downloaded Samples directory. This function automates the task of computing hashes for multiple files, allowing for bulk processing.

  1. Directory Traversal:
    • The script recursively walks through the entire Downloaded Samples directory and processes every file. This is useful if you’re dealing with multiple ZIP files that have been extracted into subfolders.
  2. Organized Output:
    • The hash values are written to text files, one for each hash algorithm (e.g., md5.txt, sha1.txt, sha256.txt). This makes it easy to cross-reference and search for specific hashes later.
  3. Error Handling:
    • The script also handles cases where a file can’t be processed, logging the error but continuing with the next file. This ensures that the entire process doesn’t stop if there’s an issue with one file.

Why this is important: When dealing with large volumes of malware samples, automating the process of generating hashes is a huge time saver. This also ensures consistency and prevents errors that might arise from manually generating hashes for each file.


The Command Center

From Line 111 Onward: The main() Function and User Input

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def main():
    choice = input(colored("Enter 'S' to download a single file or 'M' to download multiple files: ", 'yellow')).strip().upper()

    if choice == 'S':
        date = input(colored("Enter the date in YYYY-MM-DD format: ", 'yellow')).strip()
        download_file(date)
    elif choice == 'M':
        start_date_str = input(colored("Enter the start date in YYYY-MM-DD format: ", 'yellow')).strip()
        end_date_str = input(colored("Enter the end date in YYYY-MM-DD format: ", 'yellow')).strip()
        
        # Parse the dates
        start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
        end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
        
        download_files_in_range(start_date, end_date)
    else:
        print(colored("Invalid choice. Please enter 'S' or 'M'.", 'red'))
        return
    
    unzip_choice = input(colored("Do you want to unzip all downloaded files? (Y/N): ", 'yellow')).strip().upper()
    if unzip_choice == 'Y':
        unzip_files()
        
        hash_choice = input(colored("Do you want to calculate the hash of each file? (Y/N): ", 'yellow')).strip().upper()
        if hash_choice == 'Y':
            hash_algo_choice = input(colored("Enter the hashing algorithm (md5, sha1, sha2, all): ", 'yellow')).strip().lower()
            if hash_algo_choice in ['md5', 'sha1', 'sha2']:
                calculate_hashes(hash_algo_choice)
            elif hash_algo_choice == 'all':
                calculate_all_hashes()
            else:
                print(colored("Invalid hashing algorithm choice.", 'red'))

if __name__ == "__main__":
    main()

Finally, we have the main() function, which orchestrates the entire process by providing an interactive interface for the user. This part of the script lets the user choose what they want to do—download files, unzip them, generate hashes, or all three.

  1. Single-Day or Multi-Day Download:
    • The script first asks the user whether they want to download a single file or multiple files over a date range. Depending on the choice, the script either calls download_file() or download_files_in_range(). This gives the user flexibility in how they want to collect the malware samples.
  2. Unzipping and Hashing Options:
    • After the files are downloaded, the script asks if the user wants to unzip the files and/or generate hashes. If the user chooses to do so, the script calls the appropriate functions to complete these tasks. Again, this offers flexibility, allowing the user to perform only the steps they need.

Why this is important: This interactive approach makes the script user-friendly. Instead of hardcoding values, the user is prompted to make decisions, giving them more control over how the script operates. This makes the tool adaptable to different workflows and use cases.


Advanced Usage Techniques

Single-Day Precision Strike

For targeting specific dates:

1
2
3
$ python malware_collector.py
> Enter 'S' to download a single file or 'M' to download multiple files: S
> Enter the date in YYYY-MM-DD format: 2024-10-02

Multi-Day Campaign

For extended collection operations:

1
2
3
4
$ python malware_collector.py
> Enter 'S' to download a single file or 'M' to download multiple files: M
> Enter the start date in YYYY-MM-DD format: 2024-09-01
> Enter the end date in YYYY-MM-DD format: 2024-09-30

Security Considerations and Best Practices

1. Operational Security

  • 🔒 Always operate in isolated environments
  • 💻 Use dedicated analysis VMs
  • 🛡️ Maintain network segregation

2. Sample Management

  • 📁 Implement proper sample categorization
  • 📝 Maintain detailed operation logs
  • 💾 Regular database backups
  • 📜 Understand applicable regulations
  • 🤝 Follow responsible disclosure practices
  • 📊 Document your research methodology

Future Enhancements

1. Advanced Collection Capabilities

  • 🌐 Multi-source integration
  • 🔄 Real-time feed processing
  • 🏷️ Automatic tagging system

2. Analysis Integration

  • 🔍 Static analysis automation
  • 📊 Behavior classification
  • 🌐 Reputation checking

3. Database Enhancement

  • 💽 SQL/NoSQL integration
  • 🔍 Advanced search capabilities
  • 📈 Trend analysis features

Conclusion

Armed with this automated collection system, you are now equipped to build and maintain a formidable malware database. Whether you are just beginning your journey as a malware analyst or are a seasoned researcher, this tool will help you stay one step ahead in the world of cybersecurity.

Remember, in the realm of cyber threats, knowledge is power, and a robust malware database is the foundation of that knowledge.


Security Advisory: Exercise caution when handling malware samples. Always use dedicated, isolated environments for analysis.

Additional Resources

Share this knowledge responsibly and happy hunting! 🎯

This post is licensed under CC BY 4.0 by the author.