This article contains instructions on how to limit the bandwidth (incoming and outgoing traffic speeds) of a VM if the VM exceeds its traffic consumption. 

The current version of the VMmanager 6 processing module in the BILLmanager 6 platform does not support the bandwidth limitation function. You can implement this function using a separate Python script. The script remotely connects to the platform and changes settings of VMs that have exceeded traffic consumption.

Scripts requirements


  • The script can run on any server that has access to the platform using the HTTPS protocol.
  • To run the script, you will need to specify the platform administrator's e-mail and password.
  • For the script to run, an SSL certificate must be connected to the domain name of the platform.

Setting restrictions


To set the limits for the bandwidth of the channel:

  1. Prepare the environment for running Python scripts. See the official Python documentation for details.
  2. Create a python script called limits.py

    #!/usr/bin/python3
    
    import argparse
    import json
    import logging
    import time
    
    from datetime import datetime
    from http import HTTPStatus
    from typing import Callable, Dict
    
    import requests
    
    
    def retry(fn, **kwargs):
        response = fn(**kwargs)
        if response.status_code == HTTPStatus.SERVICE_UNAVAILABLE:
            for attempt in range(1, 11):
                time.sleep(0.1 * attempt)
                logging.info(f"Trying to execute request to {kwargs['url']} attempt {attempt}")
                response = fn(**kwargs)
                if response.status_code != HTTPStatus.SERVICE_UNAVAILABLE:
                    break
        return response
    
    
    def make_json_error(code: int, text: str):
        return {
            "error_code": code,
            "error": text
        }
    
    
    def execute_request(request: Callable, path, output=False, token=None, **kwargs):
        cookies = {}
        headers = {}
        if token:
            cookies.update({"ses6": token})
            headers.update({"x-xsrf-token": token})
        logging.info(f"Executing request to {path}")
        response = retry(request, url=path, cookies=cookies, headers=headers, **kwargs)
        content = response.content.decode("utf-8")
        if response.status_code not in (HTTPStatus.OK, HTTPStatus.CREATED):
            logging.error(f"Return code {response.status_code}: {content}")
            return make_json_error(response.status_code, content)
        if output:
            try:
                result = json.loads(content)
            except json.JSONDecodeError as e:
                logging.error(f"Error while parsing response: {content}")
                result = make_json_error(HTTPStatus.INTERNAL_SERVER_ERROR.value, e.msg)
            return result
        return None
    
    
    class API:
        def __init__(self, url, email, password):
            self.url = url
            self.email = email
            self.password = password
            self.token = None
    
        def get(self, path, params=None):
            return execute_request(requests.get, f"{self.url}{path}", output=True, token=self.token, params=params)
    
        def post(self, path, data = {}):
            return execute_request(requests.post, f"{self.url}{path}", output=True, token=self.token, json=data)
    
        def delete(self, path):
            return execute_request(requests.delete, f"{self.url}{path}", token=self.token)
    
        def __auth(self):
            result = self.post("/auth/v4/public/token", {"email": self.email, "password": self.password})
            if result is not None and result.get("error"):
                raise RuntimeError(result["error"])
    
            self.token = result["token"]
    
        def __enter__(self):
            self.__auth()
            return self
    
        def __exit__(self, e_type, e_value, e_traceback):
            if self.token:
                self.delete(f"/auth/v4/token/{self.token}")
    
    
    def parse_args() -> argparse.Namespace:
        parser = argparse.ArgumentParser(description="Check VM bandwidth")
        parser.add_argument("--url", type=str, metavar="url", help=f"Platform url in format \"https://<your_domain>\"", required=True)
        parser.add_argument("--email", type=str, metavar="email", help="Platform administrator email", required=True)
        parser.add_argument("--password", type=str, metavar="password", help="Platform administrator password", required=True)
        parser.add_argument("--vmid", type=int, metavar="vmid", help="Id of the VM", required=True)
        parser.add_argument("--threshold-gib", type=int, metavar="threshold_gib", help="VM bandwidth threshold, in GiB", default=1024)
        parser.add_argument("--limit-mbitps", type=int, metavar="limit_mbitps", help="VM bandwidth limit after reaching threshold, in Mbit/s", default=10)
        return parser.parse_args()
    
    
    def main():
        logger = logging.getLogger()
        logger.setLevel(logging.DEBUG)
    
        args = parse_args()
        print(args)
    
        now = datetime.now().strftime('%H:%M_%Y%m%d')
        first_day = datetime.today().replace(day=1, hour=0, minute=0).strftime('%H:%M_%Y%m%d')
    
        logging.info(f"Checking bandwidth from {first_day} to {now} for VM {args.vmid}")
    
        with API(args.url, args.email, args.password) as api:
            summary_bytes = 0
    
            rx = api.get(f"/vm/v3/host/{args.vmid}/metrics", params = {
                "target": "net_rx_summary",
                "from": first_day,
                "until": now,
                "output": "single",
                "interval": "1month"
            })
            logging.debug(rx)
            if rx is not None and isinstance(rx, Dict) and rx.get("error"):
                raise RuntimeError(rx["error"])
    
            if rx:
                datapoints = rx[0].get("datapoints", [])
                for point in datapoints:
                    if point[0]:
                        summary_bytes += point[0]
    
            tx = api.get(f"/vm/v3/host/{args.vmid}/metrics", params = {
                "target": "net_tx_summary",
                "from": first_day,
                "until": now,
                "output": "single",
                "interval": "1month"
            })
            if tx is not None and isinstance(tx, Dict) and tx.get("error"):
                raise RuntimeError(tx["error"])
    
            if tx:
                datapoints = tx[0].get("datapoints", [])
                for point in datapoints:
                    if point[0]:
                        summary_bytes += point[0]
    
            if summary_bytes / 2**30 >= args.threshold_gib:
                logging.info(f"Threshold {args.threshold_gib} GiB reached, limiting speed to {args.limit_mbitps}")
                result = api.post(f"/vm/v3/host/{args.vmid}/resource", data = {
                    "net_in_mbitps": args.limit_mbitps,
                    "net_out_mbitps": args.limit_mbitps
                })
                logging.info(result)
    
    
    if __name__ == '__main__':
        main()
    CODE
  3. Run the script:

    python3 limits.py --url <platform_url> --email <admin_email> --password <admin_pass> --vmid <vm_id> --threshold-gib <threshold> --limit-mbitps <limit>
    CODE

    <platform_url> — IP address or domain name of the server with VMmanager in the format https://domain.com

    <admin_email> — VMmanager admin email

    <admin_pass> — VMmanager admin password

    <vm_id> — VM id

    <threshold> — maximum traffic volume per month, Gb. Optional parameter. The default value is 1024.

    <limit> — bandwidth after exceeding the threshold, Mbit/s. Optional parameter. The default value is 10.