Source code for nvd_api.low_api.api_client

"""
    NVD API 2.0 Python API

    No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)  # noqa: E501

    The version of the OpenAPI document: 0.1.0
    Contact: 15080890+kannkyo@users.noreply.github.com
    Generated by: https://openapi-generator.tech
"""


import json
import atexit
import mimetypes
from multiprocessing.pool import ThreadPool
import io
import os
import re
import typing
from urllib.parse import quote
from urllib3.fields import RequestField


from nvd_api.low_api import rest
from nvd_api.low_api.configuration import Configuration
from nvd_api.low_api.exceptions import ApiTypeError, ApiValueError, ApiException
from nvd_api.low_api.model_utils import (
    ModelNormal,
    ModelSimple,
    ModelComposed,
    check_allowed_values,
    check_validations,
    date,
    datetime,
    deserialize_file,
    file_type,
    model_to_dict,
    none_type,
    validate_and_convert_types
)


[docs]class ApiClient(object): """Generic API client for OpenAPI client library builds. OpenAPI generic API client. This client handles the client- server communication, and is invariant across implementations. Specifics of the methods and models for each application are generated from the OpenAPI templates. NOTE: This class is auto generated by OpenAPI Generator. Ref: https://openapi-generator.tech Do not edit the class manually. :param configuration: .Configuration object for this client :param header_name: a header to pass when making calls to the API. :param header_value: a header value to pass when making calls to the API. :param cookie: a cookie to include in the header when making calls to the API :param pool_threads: The number of threads to use for async requests to the API. More threads means more concurrent API requests. """ _pool = None def __init__(self, configuration=None, header_name=None, header_value=None, cookie=None, pool_threads=1): if configuration is None: configuration = Configuration.get_default_copy() self.configuration = configuration self.pool_threads = pool_threads self.rest_client = rest.RESTClientObject(configuration) self.default_headers = {} if header_name is not None: self.default_headers[header_name] = header_value self.cookie = cookie # Set default User-Agent. self.user_agent = 'OpenAPI-Generator/1.0.0/python' def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close()
[docs] def close(self): if self._pool: self._pool.close() self._pool.join() self._pool = None if hasattr(atexit, 'unregister'): atexit.unregister(self.close)
@property def pool(self): """Create thread pool on first request avoids instantiating unused threadpool for blocking clients. """ if self._pool is None: atexit.register(self.close) self._pool = ThreadPool(self.pool_threads) return self._pool @property def user_agent(self): """User agent for this API client""" return self.default_headers['User-Agent'] @user_agent.setter def user_agent(self, value): self.default_headers['User-Agent'] = value
[docs] def set_default_header(self, header_name, header_value): self.default_headers[header_name] = header_value
def __call_api( self, resource_path: str, method: str, path_params: typing.Optional[typing.Dict[str, typing.Any]] = None, query_params: typing.Optional[typing.List[typing.Tuple[str, typing.Any]]] = None, header_params: typing.Optional[typing.Dict[str, typing.Any]] = None, body: typing.Optional[typing.Any] = None, post_params: typing.Optional[typing.List[typing.Tuple[str, typing.Any]]] = None, files: typing.Optional[typing.Dict[str, typing.List[io.IOBase]]] = None, response_type: typing.Optional[typing.Tuple[typing.Any]] = None, auth_settings: typing.Optional[typing.List[str]] = None, _return_http_data_only: typing.Optional[bool] = None, collection_formats: typing.Optional[typing.Dict[str, str]] = None, _preload_content: bool = True, _request_timeout: typing.Optional[typing.Union[int, float, typing.Tuple]] = None, _host: typing.Optional[str] = None, _check_type: typing.Optional[bool] = None, _content_type: typing.Optional[str] = None, _request_auths: typing.Optional[typing.List[typing.Dict[str, typing.Any]]] = None ): config = self.configuration # header parameters header_params = header_params or {} header_params.update(self.default_headers) if self.cookie: header_params['Cookie'] = self.cookie if header_params: header_params = self.sanitize_for_serialization(header_params) header_params = dict(self.parameters_to_tuples(header_params, collection_formats)) # path parameters if path_params: path_params = self.sanitize_for_serialization(path_params) path_params = self.parameters_to_tuples(path_params, collection_formats) for k, v in path_params: # specified safe chars, encode everything resource_path = resource_path.replace( '{%s}' % k, quote(str(v), safe=config.safe_chars_for_path_param) ) # query parameters if query_params: query_params = self.sanitize_for_serialization(query_params) query_params = self.parameters_to_tuples(query_params, collection_formats) # post parameters if post_params or files: post_params = post_params if post_params else [] post_params = self.sanitize_for_serialization(post_params) post_params = self.parameters_to_tuples(post_params, collection_formats) post_params.extend(self.files_parameters(files)) if header_params['Content-Type'].startswith("multipart"): post_params = self.parameters_to_multipart(post_params, (dict)) # body if body: body = self.sanitize_for_serialization(body) # auth setting self.update_params_for_auth(header_params, query_params, auth_settings, resource_path, method, body, request_auths=_request_auths) # request url if _host is None: url = self.configuration.host + resource_path else: # use server/host defined in path or operation instead url = _host + resource_path try: # perform request and return response response_data = self.request( method, url, query_params=query_params, headers=header_params, post_params=post_params, body=body, _preload_content=_preload_content, _request_timeout=_request_timeout) except ApiException as e: e.body = e.body.decode('utf-8') raise e self.last_response = response_data return_data = response_data if not _preload_content: return (return_data) return return_data # deserialize response data if response_type: if response_type != (file_type,): encoding = "utf-8" content_type = response_data.getheader('content-type') if content_type is not None: match = re.search(r"charset=([a-zA-Z\-\d]+)[\s\;]?", content_type) if match: encoding = match.group(1) response_data.data = response_data.data.decode(encoding) return_data = self.deserialize( response_data, response_type, _check_type ) else: return_data = None if _return_http_data_only: return (return_data) else: return (return_data, response_data.status, response_data.getheaders())
[docs] def parameters_to_multipart(self, params, collection_types): """Get parameters as list of tuples, formatting as json if value is collection_types :param params: Parameters as list of two-tuples :param dict collection_types: Parameter collection types :return: Parameters as list of tuple or urllib3.fields.RequestField """ new_params = [] if collection_types is None: collection_types = (dict) for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501 if isinstance( v, collection_types): # v is instance of collection_type, formatting as application/json v = json.dumps(v, ensure_ascii=False).encode("utf-8") field = RequestField(k, v) field.make_multipart(content_type="application/json; charset=utf-8") new_params.append(field) else: new_params.append((k, v)) return new_params
[docs] @classmethod def sanitize_for_serialization(cls, obj): """Prepares data for transmission before it is sent with the rest client If obj is None, return None. If obj is str, int, long, float, bool, return directly. If obj is datetime.datetime, datetime.date convert to string in iso8601 format. If obj is list, sanitize each element in the list. If obj is dict, return the dict. If obj is OpenAPI model, return the properties dict. If obj is io.IOBase, return the bytes :param obj: The data to serialize. :return: The serialized form of data. """ if isinstance(obj, (ModelNormal, ModelComposed)): return { key: cls.sanitize_for_serialization(val) for key, val in model_to_dict( obj, serialize=True).items()} elif isinstance(obj, io.IOBase): return cls.get_file_data_and_close_file(obj) elif isinstance(obj, (str, int, float, none_type, bool)): return obj elif isinstance(obj, (datetime, date)): return obj.isoformat() elif isinstance(obj, ModelSimple): return cls.sanitize_for_serialization(obj.value) elif isinstance(obj, (list, tuple)): return [cls.sanitize_for_serialization(item) for item in obj] if isinstance(obj, dict): return {key: cls.sanitize_for_serialization(val) for key, val in obj.items()} raise ApiValueError( 'Unable to prepare type {} for serialization'.format( obj.__class__.__name__))
[docs] def deserialize(self, response, response_type, _check_type): """Deserializes response into an object. :param response: RESTResponse object to be deserialized. :param response_type: For the response, a tuple containing: valid classes a list containing valid classes (for list schemas) a dict containing a tuple of valid classes as the value Example values: (str,) (Pet,) (float, none_type) ([int, none_type],) ({str: (bool, str, int, float, date, datetime, str, none_type)},) :param _check_type: boolean, whether to check the types of the data received from the server :type _check_type: bool :return: deserialized object. """ # handle file downloading # save response body into a tmp file and return the instance if response_type == (file_type,): content_disposition = response.getheader("Content-Disposition") return deserialize_file(response.data, self.configuration, content_disposition=content_disposition) # fetch data from response object try: received_data = json.loads(response.data) except ValueError: received_data = response.data # store our data under the key of 'received_data' so users have some # context if they are deserializing a string and the data type is wrong deserialized_data = validate_and_convert_types( received_data, response_type, ['received_data'], True, _check_type, configuration=self.configuration ) return deserialized_data
[docs] def call_api( self, resource_path: str, method: str, path_params: typing.Optional[typing.Dict[str, typing.Any]] = None, query_params: typing.Optional[typing.List[typing.Tuple[str, typing.Any]]] = None, header_params: typing.Optional[typing.Dict[str, typing.Any]] = None, body: typing.Optional[typing.Any] = None, post_params: typing.Optional[typing.List[typing.Tuple[str, typing.Any]]] = None, files: typing.Optional[typing.Dict[str, typing.List[io.IOBase]]] = None, response_type: typing.Optional[typing.Tuple[typing.Any]] = None, auth_settings: typing.Optional[typing.List[str]] = None, async_req: typing.Optional[bool] = None, _return_http_data_only: typing.Optional[bool] = None, collection_formats: typing.Optional[typing.Dict[str, str]] = None, _preload_content: bool = True, _request_timeout: typing.Optional[typing.Union[int, float, typing.Tuple]] = None, _host: typing.Optional[str] = None, _check_type: typing.Optional[bool] = None, _request_auths: typing.Optional[typing.List[typing.Dict[str, typing.Any]]] = None ): """Makes the HTTP request (synchronous) and returns deserialized data. To make an async_req request, set the async_req parameter. :param resource_path: Path to method endpoint. :param method: Method to call. :param path_params: Path parameters in the url. :param query_params: Query parameters in the url. :param header_params: Header parameters to be placed in the request header. :param body: Request body. :param post_params dict: Request post form parameters, for `application/x-www-form-urlencoded`, `multipart/form-data`. :param auth_settings list: Auth Settings names for the request. :param response_type: For the response, a tuple containing: valid classes a list containing valid classes (for list schemas) a dict containing a tuple of valid classes as the value Example values: (str,) (Pet,) (float, none_type) ([int, none_type],) ({str: (bool, str, int, float, date, datetime, str, none_type)},) :param files: key -> field name, value -> a list of open file objects for `multipart/form-data`. :type files: dict :param async_req bool: execute request asynchronously :type async_req: bool, optional :param _return_http_data_only: response data without head status code and headers :type _return_http_data_only: bool, optional :param collection_formats: dict of collection formats for path, query, header, and post parameters. :type collection_formats: dict, optional :param _preload_content: if False, the urllib3.HTTPResponse object will be returned without reading/decoding response data. Default is True. :type _preload_content: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _check_type: boolean describing if the data back from the server should have its type checked. :type _check_type: bool, optional :param _request_auths: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auths: list, optional :return: If async_req parameter is True, the request will be called asynchronously. The method will return the request thread. If parameter async_req is False or missing, then the method will return the response directly. """ if not async_req: return self.__call_api(resource_path, method, path_params, query_params, header_params, body, post_params, files, response_type, auth_settings, _return_http_data_only, collection_formats, _preload_content, _request_timeout, _host, _check_type, _request_auths=_request_auths) return self.pool.apply_async(self.__call_api, (resource_path, method, path_params, query_params, header_params, body, post_params, files, response_type, auth_settings, _return_http_data_only, collection_formats, _preload_content, _request_timeout, _host, _check_type, None, _request_auths))
[docs] def request(self, method, url, query_params=None, headers=None, post_params=None, body=None, _preload_content=True, _request_timeout=None): """Makes the HTTP request using RESTClient.""" if method == "GET": return self.rest_client.GET(url, query_params=query_params, _preload_content=_preload_content, _request_timeout=_request_timeout, headers=headers) elif method == "HEAD": return self.rest_client.HEAD(url, query_params=query_params, _preload_content=_preload_content, _request_timeout=_request_timeout, headers=headers) elif method == "OPTIONS": return self.rest_client.OPTIONS(url, query_params=query_params, headers=headers, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, body=body) elif method == "POST": return self.rest_client.POST(url, query_params=query_params, headers=headers, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, body=body) elif method == "PUT": return self.rest_client.PUT(url, query_params=query_params, headers=headers, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, body=body) elif method == "PATCH": return self.rest_client.PATCH(url, query_params=query_params, headers=headers, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, body=body) elif method == "DELETE": return self.rest_client.DELETE(url, query_params=query_params, headers=headers, _preload_content=_preload_content, _request_timeout=_request_timeout, body=body) else: raise ApiValueError( "http method must be `GET`, `HEAD`, `OPTIONS`," " `POST`, `PATCH`, `PUT` or `DELETE`." )
[docs] def parameters_to_tuples(self, params, collection_formats): """Get parameters as list of tuples, formatting collections. :param params: Parameters as dict or list of two-tuples :param dict collection_formats: Parameter collection formats :return: Parameters as list of tuples, collections formatted """ new_params = [] if collection_formats is None: collection_formats = {} for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501 if k in collection_formats: collection_format = collection_formats[k] if collection_format == 'multi': new_params.extend((k, value) for value in v) else: if collection_format == 'ssv': delimiter = ' ' elif collection_format == 'tsv': delimiter = '\t' elif collection_format == 'pipes': delimiter = '|' else: # csv is the default delimiter = ',' new_params.append( (k, delimiter.join(str(value) for value in v))) else: new_params.append((k, v)) return new_params
[docs] @staticmethod def get_file_data_and_close_file(file_instance: io.IOBase) -> bytes: file_data = file_instance.read() file_instance.close() return file_data
[docs] def files_parameters(self, files: typing.Optional[typing.Dict[str, typing.List[io.IOBase]]] = None): """Builds form parameters. :param files: None or a dict with key=param_name and value is a list of open file objects :return: List of tuples of form parameters with file data """ if files is None: return [] params = [] for param_name, file_instances in files.items(): if file_instances is None: # if the file field is nullable, skip None values continue for file_instance in file_instances: if file_instance is None: # if the file field is nullable, skip None values continue if file_instance.closed is True: raise ApiValueError( "Cannot read a closed file. The passed in file_type " "for %s must be open." % param_name ) filename = os.path.basename(file_instance.name) filedata = self.get_file_data_and_close_file(file_instance) mimetype = (mimetypes.guess_type(filename)[0] or 'application/octet-stream') params.append( tuple([param_name, tuple([filename, filedata, mimetype])])) return params
[docs] def select_header_accept(self, accepts): """Returns `Accept` based on an array of accepts provided. :param accepts: List of headers. :return: Accept (e.g. application/json). """ if not accepts: return accepts = [x.lower() for x in accepts] if 'application/json' in accepts: return 'application/json' else: return ', '.join(accepts)
[docs] def select_header_content_type(self, content_types, method=None, body=None): """Returns `Content-Type` based on an array of content_types provided. :param content_types: List of content-types. :param method: http method (e.g. POST, PATCH). :param body: http body to send. :return: Content-Type (e.g. application/json). """ if not content_types: return None content_types = [x.lower() for x in content_types] if (method == 'PATCH' and 'application/json-patch+json' in content_types and isinstance(body, list)): return 'application/json-patch+json' if 'application/json' in content_types or '*/*' in content_types: return 'application/json' else: return content_types[0]
[docs] def update_params_for_auth(self, headers, queries, auth_settings, resource_path, method, body, request_auths=None): """Updates header and query params based on authentication setting. :param headers: Header parameters dict to be updated. :param queries: Query parameters tuple list to be updated. :param auth_settings: Authentication setting identifiers list. :param resource_path: A string representation of the HTTP request resource path. :param method: A string representation of the HTTP request method. :param body: A object representing the body of the HTTP request. The object type is the return value of _encoder.default(). :param request_auths: if set, the provided settings will override the token in the configuration. """ if not auth_settings: return if request_auths: for auth_setting in request_auths: self._apply_auth_params( headers, queries, resource_path, method, body, auth_setting) return for auth in auth_settings: auth_setting = self.configuration.auth_settings().get(auth) if auth_setting: self._apply_auth_params( headers, queries, resource_path, method, body, auth_setting)
def _apply_auth_params(self, headers, queries, resource_path, method, body, auth_setting): if auth_setting['in'] == 'cookie': headers['Cookie'] = auth_setting['key'] + "=" + auth_setting['value'] elif auth_setting['in'] == 'header': if auth_setting['type'] != 'http-signature': headers[auth_setting['key']] = auth_setting['value'] elif auth_setting['in'] == 'query': queries.append((auth_setting['key'], auth_setting['value'])) else: raise ApiValueError( 'Authentication token must be in `query` or `header`' )
[docs]class Endpoint(object): def __init__(self, settings=None, params_map=None, root_map=None, headers_map=None, api_client=None, callable=None): """Creates an endpoint Args: settings (dict): see below key value pairs 'response_type' (tuple/None): response type 'auth' (list): a list of auth type keys 'endpoint_path' (str): the endpoint path 'operation_id' (str): endpoint string identifier 'http_method' (str): POST/PUT/PATCH/GET etc 'servers' (list): list of str servers that this endpoint is at params_map (dict): see below key value pairs 'all' (list): list of str endpoint parameter names 'required' (list): list of required parameter names 'nullable' (list): list of nullable parameter names 'enum' (list): list of parameters with enum values 'validation' (list): list of parameters with validations root_map 'validations' (dict): the dict mapping endpoint parameter tuple paths to their validation dictionaries 'allowed_values' (dict): the dict mapping endpoint parameter tuple paths to their allowed_values (enum) dictionaries 'openapi_types' (dict): param_name to openapi type 'attribute_map' (dict): param_name to camelCase name 'location_map' (dict): param_name to 'body', 'file', 'form', 'header', 'path', 'query' collection_format_map (dict): param_name to `csv` etc. headers_map (dict): see below key value pairs 'accept' (list): list of Accept header strings 'content_type' (list): list of Content-Type header strings api_client (ApiClient) api client instance callable (function): the function which is invoked when the Endpoint is called """ self.settings = settings self.params_map = params_map self.params_map['all'].extend([ 'async_req', '_host_index', '_preload_content', '_request_timeout', '_return_http_data_only', '_check_input_type', '_check_return_type', '_content_type', '_spec_property_naming', '_request_auths' ]) self.params_map['nullable'].extend(['_request_timeout']) self.validations = root_map['validations'] self.allowed_values = root_map['allowed_values'] self.openapi_types = root_map['openapi_types'] extra_types = { 'async_req': (bool,), '_host_index': (none_type, int), '_preload_content': (bool,), '_request_timeout': (none_type, float, (float,), [float], int, (int,), [int]), '_return_http_data_only': (bool,), '_check_input_type': (bool,), '_check_return_type': (bool,), '_spec_property_naming': (bool,), '_content_type': (none_type, str), '_request_auths': (none_type, list) } self.openapi_types.update(extra_types) self.attribute_map = root_map['attribute_map'] self.location_map = root_map['location_map'] self.collection_format_map = root_map['collection_format_map'] self.headers_map = headers_map self.api_client = api_client self.callable = callable def __validate_inputs(self, kwargs): for param in self.params_map['enum']: if param in kwargs: check_allowed_values( self.allowed_values, (param,), kwargs[param] ) for param in self.params_map['validation']: if param in kwargs: check_validations( self.validations, (param,), kwargs[param], configuration=self.api_client.configuration ) if kwargs['_check_input_type'] is False: return for key, value in kwargs.items(): fixed_val = validate_and_convert_types( value, self.openapi_types[key], [key], kwargs['_spec_property_naming'], kwargs['_check_input_type'], configuration=self.api_client.configuration ) kwargs[key] = fixed_val def __gather_params(self, kwargs): params = { 'body': None, 'collection_format': {}, 'file': {}, 'form': [], 'header': {}, 'path': {}, 'query': [] } for param_name, param_value in kwargs.items(): param_location = self.location_map.get(param_name) if param_location is None: continue if param_location: if param_location == 'body': params['body'] = param_value continue base_name = self.attribute_map[param_name] if (param_location == 'form' and self.openapi_types[param_name] == (file_type,)): params['file'][base_name] = [param_value] elif (param_location == 'form' and self.openapi_types[param_name] == ([file_type],)): # param_value is already a list params['file'][base_name] = param_value elif param_location in {'form', 'query'}: param_value_full = (base_name, param_value) params[param_location].append(param_value_full) if param_location not in {'form', 'query'}: params[param_location][base_name] = param_value collection_format = self.collection_format_map.get(param_name) if collection_format: params['collection_format'][base_name] = collection_format return params def __call__(self, *args, **kwargs): """ This method is invoked when endpoints are called Example: api_instance = ProductsApi() api_instance.get_cpe_match # this is an instance of the class Endpoint api_instance.get_cpe_match() # this invokes api_instance.get_cpe_match.__call__() which then invokes the callable functions stored in that endpoint at api_instance.get_cpe_match.callable or self.callable in this class """ return self.callable(self, *args, **kwargs)
[docs] def call_with_http_info(self, **kwargs): try: index = self.api_client.configuration.server_operation_index.get( self.settings['operation_id'], self.api_client.configuration.server_index ) if kwargs['_host_index'] is None else kwargs['_host_index'] server_variables = self.api_client.configuration.server_operation_variables.get( self.settings['operation_id'], self.api_client.configuration.server_variables ) _host = self.api_client.configuration.get_host_from_settings( index, variables=server_variables, servers=self.settings['servers'] ) except IndexError: if self.settings['servers']: raise ApiValueError( "Invalid host index. Must be 0 <= index < %s" % len(self.settings['servers']) ) _host = None for key, value in kwargs.items(): if key not in self.params_map['all']: raise ApiTypeError( "Got an unexpected parameter '%s'" " to method `%s`" % (key, self.settings['operation_id']) ) # only throw this nullable ApiValueError if _check_input_type # is False, if _check_input_type==True we catch this case # in self.__validate_inputs if (key not in self.params_map['nullable'] and value is None and kwargs['_check_input_type'] is False): raise ApiValueError( "Value may not be None for non-nullable parameter `%s`" " when calling `%s`" % (key, self.settings['operation_id']) ) for key in self.params_map['required']: if key not in kwargs.keys(): raise ApiValueError( "Missing the required parameter `%s` when calling " "`%s`" % (key, self.settings['operation_id']) ) self.__validate_inputs(kwargs) params = self.__gather_params(kwargs) accept_headers_list = self.headers_map['accept'] if accept_headers_list: params['header']['Accept'] = self.api_client.select_header_accept( accept_headers_list) if kwargs.get('_content_type'): params['header']['Content-Type'] = kwargs['_content_type'] else: content_type_headers_list = self.headers_map['content_type'] if content_type_headers_list: if params['body'] != "": content_types_list = self.api_client.select_header_content_type( content_type_headers_list, self.settings['http_method'], params['body']) if content_types_list: params['header']['Content-Type'] = content_types_list return self.api_client.call_api( self.settings['endpoint_path'], self.settings['http_method'], params['path'], params['query'], params['header'], body=params['body'], post_params=params['form'], files=params['file'], response_type=self.settings['response_type'], auth_settings=self.settings['auth'], async_req=kwargs['async_req'], _check_type=kwargs['_check_return_type'], _return_http_data_only=kwargs['_return_http_data_only'], _preload_content=kwargs['_preload_content'], _request_timeout=kwargs['_request_timeout'], _host=_host, _request_auths=kwargs['_request_auths'], collection_formats=params['collection_format'])