Teams ‐ Ceph Dashboard ‐ Best Practices
Pedro Gonzalez Gomez edited this page 2025-10-28 12:25:28 +01:00

Important

This document is still WIP

Note

GOOD and BAD indicate practices that MUST be followed and avoided.

BETTER and WORSE indicate practices that SHOULD be followed or avoided.

Back-end (Python)

Indentation

Avoid PEP8's opening-delimiter alignment

It generates artificial whitespacing, and makes refactoring harder), and use extra 4-space indentation:

# AVOID: Aligned with opening delimiter.
def check_cluster_connection(url: str, payload: bytes, username: str,
                             ssl_verify: bool = True, ssl_certificate: Optional[bytes] = None):
    print(...)

# BETTER: Regular extra 4-space indentation
def check_cluster_connection(
       url: str,
       payload: bytes,
       username: str,
       ssl_verify: bool = True,
       ssl_certificate: Optional[bytes] = None,
):
    print(...)

Data structures

Dicts

dict should ONLY be used for associative indexing of uniform types (e.g.: Dict[str, bool]), NEVER for nested, arbitrary or complex objects (e.g.: Dict[str, Any] or Dict[Any, Any]), or config/Plain Old Python Objects (derived from Java's POJOs) or Data Transfer Objects (DTOs):

# BAD
http_service = {
       'protocol': protocol,
       'addr': addr,
       'port': port,
}

# GOOD
active_users: Dict[str, bool] = {'user1': True, 'user2': False, ...}
  • Neutral: unlike user-defined class, dict doesn't need initialization logic. However this has improved with newer data types like NamedTuple or dataclass, which automatically generate __init__ and other magic methods.
  • Neutral: dicts can be easily serialized as JSON, while user-defined classes require custom serialization methods. However, this has changed with NamedTuple and dataclass, which provide helpers for serialization (e.g.: named_tuple._asdict() or dataclasses.asdict()).
  • Cons: dicts enforce lazy/loose typing (e.g.: http_config: Dict[str, Any] vs. config: HTTPService). Use structured classes instead (dataclass, NamedTuple or, less ideally, TypedDict):
    # ALTERNATIVES: (from best to worst):
    @dataclasses.dataclass  # Python >= 3.7, although 3.6 dataclasses backport
    class HTTPService:
        protocol: Literal['http', 'https']
        addr: IPAddress
        port: int = 80
    
    class HTTPService(typing.NamedTuple):  # NamedTuples are immutable
        protocol: Literal['http', 'https']
        addr: IPAddress
        port: int = 80
    
    class HTTPService(typing.TypedDict):  # Python >= 3.8
        protocol: Literal['http', 'https']
        addr: IPAddress
        port: NotRequired[int]  # TypedDict doesn't support default values
    
  • Cons: "property" access in dicts cannot be checked statically (e.g: note the typo in http_service['portocol'] vs. http_service.portocol which would raise an error in IDEs/linters), except for TypedDicts.

Controllers

"Thin Controllers, Fat Services"

Controllers should just:

  • parse, validate or transform request data (HTTP headers, cookies, query params, body data) into internal data structures,
  • invoke one or more services to process that data,
  • adapt the data returned by the service/s to the expected response format,
  • handle and report errors.

Example

# FAT MODEL, THIN SERVICES
    @Endpoint('POST')
    @CreatePermission
    def auth(self, url: str, cluster_alias: str, username: str,
             password=None, hub_url=None, ssl_verify=False, ssl_certificate=None, ttl=None):
        try:
            hub_fsid = mgr.get('config')['fsid']
        except KeyError:
            hub_fsid = ''

        if password:
            payload = {
                'username': username,
                'password': password,
                'ttl': ttl
            }
            cluster_token = self.check_cluster_connection(url, payload, username,
                                                          ssl_verify, ssl_certificate,
                                                          'connect')

            cors_endpoints_string = self.get_cors_endpoints_string(hub_url)

            self._proxy('PUT', url, 'ui-api/multi-cluster/set_cors_endpoint',
                        payload={'url': cors_endpoints_string}, token=cluster_token,
                        verify=ssl_verify, cert=ssl_certificate)

            fsid = self._proxy('GET', url, 'api/health/get_cluster_fsid', token=cluster_token,
                               verify=ssl_verify, cert=ssl_certificate)

            managed_by_clusters_content = self._proxy('GET', url,
                                                      'api/settings/MANAGED_BY_CLUSTERS',
                                                      token=cluster_token,
                                                      verify=ssl_verify, cert=ssl_certificate)

            managed_by_clusters_config = managed_by_clusters_content['value']

            if managed_by_clusters_config is not None:
                managed_by_clusters_config.append({'url': hub_url, 'fsid': hub_fsid})

            self._proxy('PUT', url, 'api/settings/MANAGED_BY_CLUSTERS',
                        payload={'value': managed_by_clusters_config}, token=cluster_token,
                        verify=ssl_verify, cert=ssl_certificate)

            # add prometheus targets
            prometheus_url = self._proxy('GET', url, 'api/multi-cluster/get_prometheus_api_url',
                                         token=cluster_token, verify=ssl_verify,
                                         cert=ssl_certificate)
            logger.info('prometheus_url: %s', prometheus_url)
            prometheus_access_info = self._proxy('GET', url,
                                                 'ui-api/multi-cluster/get_prometheus_access_info',  # noqa E501 #pylint: disable=line-too-long
                                                 token=cluster_token, verify=ssl_verify,
                                                 cert=ssl_certificate)

            _set_prometheus_targets(prometheus_url)

            self.set_multi_cluster_config(fsid, username, url, cluster_alias,
                                          cluster_token, prometheus_url, ssl_verify,
                                          ssl_certificate, prometheus_access_info)
            return True
        return False
# THIN MODEL, FAT SERVICE
    @Endpoint('POST')
    @CreatePermission
    def auth(self, url: str, cluster_alias: str, username: str,
             password=None, hub_url=None, ssl_verify=False, ssl_certificate=None, ttl=None):
        hub_fsid = mgr.get('config').get('fsid','')

        if password:
            multi_cluster_service.configure(url, username, password, ttl, hub_url, hub_fsid, ssl_verify, ssl_certificate)
            return True
        else:
            return False

Algorithms

Don't reassign variable names

Don't reuse/reassign variable names, especially when refactoring code.

# ORIGINAL CODE
def get_url(url):
  function_that_expects_scheme_in_url(url)
  ...

# REFACTORED CODE
def get_url(url):
  url = url.strip("http://")
  new_funcion_that_expects_schemeless_url(url)
  ...
  function_that_expects_scheme_in_url(url)  # FAILURE
  ...

# BETTER REFACTORED CODE
def get_url(url):
  schemeless_url = url.strip("http://")
  new_funcion_that_expects_schemeless_url(schemeless_url)
  ...
  function_that_expects_scheme_in_url(url)  # GOOD
  ...

Logging

Log Levels

  • CRITICAL: for events that cause Dashboard module to stop/fail to load (e.g.: trying to bind Dashboard server to a busy port).
  • ERROR: for events that cause a client request to fail (HTTP status 4xx, 5xx). The cause of the error should be clearly explained and/or how to fix it.
  • WARNING: for conditions that might result in an error or undesired behaviour. If operator intervention is expected, the message should clearly indicate how to fix the problematic condition.
  • INFO: for events and conditions that are meaningful to a Ceph operator or in high-level troubleshooting (e.g.: a component/service status changes, a user updates their password, ...).
  • DEBUG: for developer-relevant information. That said, log traces should be added carefully (e.g.: log variable changes rather than constantly polling & dumping the same variable value).

Do NOT log sensitive information

Hide password, hashes, tokens, keys, etc.

Don't pass the logger to functions/methods

Logging is cross-cutting concern that doesn't fit in OOP, so it's always better to use loggers as:

  • module variables:
    logger = logging.getLogger("my module")
    
    def function():
        logger.debug("Entered 'function'...")
    
  • instance members:
    class Class:
        def __init__(self)
            self.logger = logging.getLogger("Class")
    
        def method(self):
            self.logger.debug("Entered 'Class.method'...")
    
  • decorators (aspect-oriented programming):
    @log
    def function(...):
    

Use logger.exception

For logging exception messages plus tracebacks:

except Exception as e:
-    logger.error("Something weird happened: %r", e)
+    logger.exception("Something weird happened")

Error handling

"Easier to Ask for Forgiveness than Permission" (EAFP):

# WORSE: this is equivalent to "return data["key"] if "key" in data ..."
return data.get("key", {}).get("subkey", {}).get("subsubkey", {})

# BETTER
try:
    return data["key"]["subkey"]["subsubkey"]  # this is not a good use of dicts (see 2.i)
except (KeyError, TypeError):
    return {}

Chained exceptions

When catching and throwing new exceptions, use from to indicate that the latter is consequence of the former:

# BAD
except FileNotFoundError as e:
    raise DashboardException("Config file not found)
>>> Traceback (most recent call last):
>>>   File "<stdin>", line 1, in <module>
>>> FileNotFoundError: [Errno 2] No such file or directory: 'config.ini'
>>> 
>>> During handling of the above exception, another exception occurred:
>>> 
>>> Traceback (most recent call last):
>>>   File "<stdin>", line 2, in <module>
>>> DashboardException: Config file not found

# GOOD
except FileNotFoundError as e:
    raise DashboardException("Config file not found) from e
>>> Traceback (most recent call last):
>>>   File "<stdin>", line 1, in <module>
>>> FileNotFoundError: [Errno 2] No such file or directory: 'config.ini'
>>> 
>>> The above exception was the direct cause of the following exception:
>>> 
>>> Traceback (most recent call last):
>>>   File "<stdin>", line 2, in <module>
>>> DashboardException: Config file not found

Add notes

[Python >=3.11] It's better add context to the exceptions instead of throwing new ones [PEP]:

# WORSE
except FileNotFoundError as e:
    raise OtherCustomError(f"File not found: {e}") from e

# BETTER
except FileNotFoundError as e:
    e.add_note("Config file not found")
    raise

Comments

Comments represent failure or impossibility to express our ideas in the code.

Weaknesses

  • Code fails to be self-explanatory.
# WORSE
# iterate over images and return the total usage
def calculate(list):
    for l in list:
        value += l['usage']  # Add each image usage value to the total
    return value

# BETTER
def sum_images_usage(images: List[image_struc])
    return sum(image['usage'] for image in images)
  • Misleading, comments that do not reflect what the code does. Iterating commented code without properly modifiying the comment will lead to misdirections
# CODE EXAMPLE:
# Iterate through _daemons.values() to find the daemon with the
# matching zonegroup_id
for daemon in RgwClient._daemons.values():
    if daemon.zonegroup_id == default_zonegroup:
        daemon_name = daemon.name
    break
# BAD -- Code is updated but comments are not
# Iterate through _daemons.values() to find the daemon with the
# matching zonegroup_id
for daemon in RgwClient._active_daemons.values():
    if daemon.zonegroup_id == default_zonegroup:
        daemon_name = daemon.name
    elif daemon.zonegroup_id == custom_id:
        daemon_name = daemon.name
    break
# BETTER -- Code and comments are updated
# Iterate through _active_daemons.values() to find the daemon with the
# matching zonegroup_id or custom_id
for daemon in RgwClient._active_daemons.values():
    if daemon.zonegroup_id == default_zonegroup:
        daemon_name = daemon.name
    elif daemon.zonegroup_id == custom_id:
        daemon_name = daemon.name
    break
# GOOD -- Code is self-explanatory
def get_active_daemon_by_id(id: str):
    for daemon in RgwClient._active_daemons.values():
        if daemon.zonegroup_id == id:
            daemon_name = daemon.name
        return daemon_name

Strengths

  • Necessity: Comments might be the only way to inform the future reader of the code of a necesity, for example when circumventing a third party library issue.
# We add ==== as padding to ignore the requirement to have correct padding in
# the urlsafe_b64decode method.
return json.loads(base64.urlsafe_b64decode(encoded_segment + "===="))
  • Docstrings: Docstrings, documentation embedded in the code. A string placed with the intention of documenting a part of the logic. Documentation is meant to represent explanation, not justification.
   def get_buckets(self, request=None):
        """
        Get a list of names from all existing buckets of this user.
        :return: Returns a list of bucket names.
        """
        response = request({'format': 'json'})
        return [bucket['Name'] for bucket in response[1]]

SOLID

Single responsibility principle

Build cohesive abstractions; objects that do one thing. Avoid objects with multiple responsibilities.

  • Classes:

    # WORSE
    class TokenManager:
      def __init__(self, token):
          self.token = token
    
      def decode_token(self, token):
          return decode(token)
    
      def token_payload(self, token):
          return payload(decode(token))
    
      def authenticate(self, token):
          return decode(token['user'])
    
      def authorize(self, token):
          return decode(token['scope'])
    
    # BETTER
    class TokenManager:
      def __init__(self, token):
          self.token = token
    
      def decode_token(self, token):
          return decode(token)
    
      def token_payload(self, token):
          return payload(decode(token))
    
    class AuthManager:
      @classmethod
      def authenticate(cls, token):
          return decode(token['user'])
    
      @classmethod
      def authorize(cls, token):
          return decode(token['scope'])
    
  • Functions:

    # WORSE
    @classmethod
    def token(cls, request: cherrypy._ThreadLocalProxy) -> str:
        try:
            return request.cookie['token'].value
        except KeyError:
            return request.headers.get('X-Access-Token')
    
    # BETTER
    @staticmethod
    def _get_token_from_cookie(request: request) -> str:
        try:
            return request.cookie['token'].value
        except KeyError:
            ''
    
    @staticmethod
    def _get_token_from_headers(request: request) -> str:
        return request.headers.get('Token', '')
    
    @classmethod
    def token(cls, request: request) -> str:
        token = cls._get_token_from_cookie(request)
        return token if token else cls._get_token_from_headers(request)
    

References


REST API

  1. Resource names
  • Functions:
    # BAD
    @RESTController.Collection(method='GET', path='/getEncryption')
    def get_encryption(self, bucket_name, daemon_name=None, owner=None):
        return self._get_encryption(bucket_name, daemon_name, owner)
    
    @RESTController.Collection(method='DELETE', path='/deleteEncryption')
    def delete_encryption(self, bucket_name, daemon_name=None, owner=None):
        return self._delete_encryption(bucket_name, daemon_name, owner)
    
    
    # GOOD
    @RESTController.Collection(method='GET', path='/encryption')
    def get_encryption(self, bucket_name, daemon_name=None, owner=None):
        return self._get_encryption(bucket_name, daemon_name, owner)
    
    @RESTController.Collection(method='DELETE', path='/encryption')
    def delete_encryption(self, bucket_name, daemon_name=None, owner=None):
        return self._delete_encryption(bucket_name, daemon_name, owner)
    
    
  1. Documentation

    Find the API's specification at:

    These are auto-generated thanks to @EndpointDoc. Specify parameters and responses whenever possible

    • Parameters:
      
      # BAD
      @EndpointDoc("Create smb share",
              parameters={
                  'share_resource': (str, 'share_resource')
              })
      
       # GOOD
       @EndpointDoc(
           "Create a new NVMeoF subsystem",
           parameters={
               "nqn": Param(str, "NVMeoF subsystem NQN"),
               "enable_ha": Param(bool, "Enable high availability", True, None),
               "max_namespaces": Param(int, "Maximum number of namespaces", True, 4096),
               "no_group_append": Param(int, "Do not append gateway group name to the NQN",
                                       True, False),
               "serial_number": Param(int, "Subsystem serial number", True, None),
               "dhchap_key": Param(int, "Subsystem DH-HMAC-CHAP key", True, None),
               "gw_group": Param(str, "NVMeoF gateway group", True, None),
               "traddr": Param(str, "NVMeoF gateway address", True, None)
           }
       )
        ```
      
    • Responses:
      
       # GOOD
       SHARE_SCHEMA = {
           "resource_type": (str, "ceph.smb.share"),
           "cluster_id": (str, "Unique identifier for the cluster"),
           "share_id": (str, "Unique identifier for the share"),
           "intent": (str, "Desired state of the resource, e.g., 'present' or 'removed'"),
           "name": (str, "Name of the share"),
           "readonly": (bool, "Indicates if the share is read-only"),
           "browseable": (bool, "Indicates if the share is browseable"),
           "cephfs": ({
               "volume": (str, "Name of the CephFS file system"),
               "path": (str, "Path within the CephFS file system"),
               "provider": (str, "Provider of the CephFS share, e.g., 'samba-vfs'"),
               "subvolumegroup": (str, "Subvolume Group in CephFS file system"),
               "subvolume": (str, "Subvolume within the CephFS file system"),
           }, "Configuration for the CephFS share")
       }
      
       @EndpointDoc("Get an smb share",
                   parameters={
                       'cluster_id': (str, 'Unique identifier for the cluster'),
                       'share_id': (str, 'Unique identifier for the share')
                   },
                   responses={200: SHARE_SCHEMA})
      

Front-end (Angular: Typescript, SCSS, HTML)

References


Security

  1. Principle of Least Privilege

References