Source code for nvd_api.low_api.rest

"""
    NVD API 2.0 Python API

    No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)  # noqa: E501

    The version of the OpenAPI document: 0.1.0
    Contact: 15080890+kannkyo@users.noreply.github.com
    Generated by: https://openapi-generator.tech
"""


import io
import json
import logging
import re
import ssl
from urllib.parse import urlencode
from urllib.parse import urlparse
from urllib.request import proxy_bypass_environment
import urllib3
import ipaddress

from nvd_api.low_api.exceptions import ApiException, UnauthorizedException, ForbiddenException, NotFoundException, ServiceException, ApiValueError


logger = logging.getLogger(__name__)


[docs]class RESTResponse(io.IOBase): def __init__(self, resp): self.urllib3_response = resp self.status = resp.status self.reason = resp.reason self.data = resp.data
[docs] def getheaders(self): """Returns a dictionary of the response headers.""" return self.urllib3_response.getheaders()
[docs] def getheader(self, name, default=None): """Returns a given response header.""" return self.urllib3_response.getheader(name, default)
[docs]class RESTClientObject(object): def __init__(self, configuration, pools_size=4, maxsize=None): # urllib3.PoolManager will pass all kw parameters to connectionpool # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 # noqa: E501 # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # noqa: E501 # maxsize is the number of requests to host that are allowed in parallel # noqa: E501 # Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501 # cert_reqs if configuration.verify_ssl: cert_reqs = ssl.CERT_REQUIRED else: cert_reqs = ssl.CERT_NONE addition_pool_args = {} if configuration.assert_hostname is not None: addition_pool_args['assert_hostname'] = configuration.assert_hostname # noqa: E501 if configuration.retries is not None: addition_pool_args['retries'] = configuration.retries if configuration.socket_options is not None: addition_pool_args['socket_options'] = configuration.socket_options if maxsize is None: if configuration.connection_pool_maxsize is not None: maxsize = configuration.connection_pool_maxsize else: maxsize = 4 # https pool manager if configuration.proxy and not should_bypass_proxies( configuration.host, no_proxy=configuration.no_proxy or ''): self.pool_manager = urllib3.ProxyManager( num_pools=pools_size, maxsize=maxsize, cert_reqs=cert_reqs, ca_certs=configuration.ssl_ca_cert, cert_file=configuration.cert_file, key_file=configuration.key_file, proxy_url=configuration.proxy, proxy_headers=configuration.proxy_headers, **addition_pool_args ) else: self.pool_manager = urllib3.PoolManager( num_pools=pools_size, maxsize=maxsize, cert_reqs=cert_reqs, ca_certs=configuration.ssl_ca_cert, cert_file=configuration.cert_file, key_file=configuration.key_file, **addition_pool_args )
[docs] def request(self, method, url, query_params=None, headers=None, body=None, post_params=None, _preload_content=True, _request_timeout=None): """Perform requests. :param method: http request method :param url: http request url :param query_params: query parameters in the url :param headers: http request headers :param body: request json body, for `application/json` :param post_params: request post parameters, `application/x-www-form-urlencoded` and `multipart/form-data` :param _preload_content: if False, the urllib3.HTTPResponse object will be returned without reading/decoding response data. Default is True. :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. """ method = method.upper() assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT', 'PATCH', 'OPTIONS'] if post_params and body: raise ApiValueError( "body parameter cannot be used with post_params parameter." ) post_params = post_params or {} headers = headers or {} timeout = None if _request_timeout: if isinstance(_request_timeout, (int, float)): # noqa: E501,F821 timeout = urllib3.Timeout(total=_request_timeout) elif (isinstance(_request_timeout, tuple) and len(_request_timeout) == 2): timeout = urllib3.Timeout( connect=_request_timeout[0], read=_request_timeout[1]) try: # For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE` if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']: # Only set a default Content-Type for POST, PUT, PATCH and OPTIONS requests if (method != 'DELETE') and ('Content-Type' not in headers): headers['Content-Type'] = 'application/json' if query_params: url += '?' + urlencode(query_params) if ('Content-Type' not in headers) or (re.search('json', headers['Content-Type'], re.IGNORECASE)): request_body = None if body is not None: request_body = json.dumps(body) r = self.pool_manager.request( method, url, body=request_body, preload_content=_preload_content, timeout=timeout, headers=headers) elif headers['Content-Type'] == 'application/x-www-form-urlencoded': # noqa: E501 r = self.pool_manager.request( method, url, fields=post_params, encode_multipart=False, preload_content=_preload_content, timeout=timeout, headers=headers) elif headers['Content-Type'] == 'multipart/form-data': # must del headers['Content-Type'], or the correct # Content-Type which generated by urllib3 will be # overwritten. del headers['Content-Type'] r = self.pool_manager.request( method, url, fields=post_params, encode_multipart=True, preload_content=_preload_content, timeout=timeout, headers=headers) # Pass a `string` parameter directly in the body to support # other content types than Json when `body` argument is # provided in serialized form elif isinstance(body, str) or isinstance(body, bytes): request_body = body r = self.pool_manager.request( method, url, body=request_body, preload_content=_preload_content, timeout=timeout, headers=headers) else: # Cannot generate the request from given parameters msg = """Cannot prepare a request message for provided arguments. Please check that your arguments match declared content type.""" raise ApiException(status=0, reason=msg) # For `GET`, `HEAD` else: r = self.pool_manager.request(method, url, fields=query_params, preload_content=_preload_content, timeout=timeout, headers=headers) except urllib3.exceptions.SSLError as e: msg = "{0}\n{1}".format(type(e).__name__, str(e)) raise ApiException(status=0, reason=msg) if _preload_content: r = RESTResponse(r) # log response body logger.debug("response body: %s", r.data) if not 200 <= r.status <= 299: if r.status == 401: raise UnauthorizedException(http_resp=r) if r.status == 403: raise ForbiddenException(http_resp=r) if r.status == 404: raise NotFoundException(http_resp=r) if 500 <= r.status <= 599: raise ServiceException(http_resp=r) raise ApiException(http_resp=r) return r
[docs] def GET(self, url, headers=None, query_params=None, _preload_content=True, _request_timeout=None): return self.request("GET", url, headers=headers, _preload_content=_preload_content, _request_timeout=_request_timeout, query_params=query_params)
[docs] def HEAD(self, url, headers=None, query_params=None, _preload_content=True, _request_timeout=None): return self.request("HEAD", url, headers=headers, _preload_content=_preload_content, _request_timeout=_request_timeout, query_params=query_params)
[docs] def OPTIONS(self, url, headers=None, query_params=None, post_params=None, body=None, _preload_content=True, _request_timeout=None): return self.request("OPTIONS", url, headers=headers, query_params=query_params, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, body=body)
[docs] def DELETE(self, url, headers=None, query_params=None, body=None, _preload_content=True, _request_timeout=None): return self.request("DELETE", url, headers=headers, query_params=query_params, _preload_content=_preload_content, _request_timeout=_request_timeout, body=body)
[docs] def POST(self, url, headers=None, query_params=None, post_params=None, body=None, _preload_content=True, _request_timeout=None): return self.request("POST", url, headers=headers, query_params=query_params, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, body=body)
[docs] def PUT(self, url, headers=None, query_params=None, post_params=None, body=None, _preload_content=True, _request_timeout=None): return self.request("PUT", url, headers=headers, query_params=query_params, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, body=body)
[docs] def PATCH(self, url, headers=None, query_params=None, post_params=None, body=None, _preload_content=True, _request_timeout=None): return self.request("PATCH", url, headers=headers, query_params=query_params, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, body=body)
# end of class RESTClientObject
[docs]def is_ipv4(target): """ Test if IPv4 address or not """ try: chk = ipaddress.IPv4Address(target) return True except ipaddress.AddressValueError: return False
[docs]def in_ipv4net(target, net): """ Test if target belongs to given IPv4 network """ try: nw = ipaddress.IPv4Network(net) ip = ipaddress.IPv4Address(target) if ip in nw: return True return False except ipaddress.AddressValueError: return False except ipaddress.NetmaskValueError: return False
[docs]def should_bypass_proxies(url, no_proxy=None): """ Yet another requests.should_bypass_proxies Test if proxies should not be used for a particular url. """ parsed = urlparse(url) # special cases if parsed.hostname in [None, '']: return True # special cases if no_proxy in [None, '']: return False if no_proxy == '*': return True no_proxy = no_proxy.lower().replace(' ', ''); entries = ( host for host in no_proxy.split(',') if host ) if is_ipv4(parsed.hostname): for item in entries: if in_ipv4net(parsed.hostname, item): return True return proxy_bypass_environment(parsed.hostname, {'no': no_proxy})