392 lines
11 KiB
Python
392 lines
11 KiB
Python
import functools
|
|
import hmac
|
|
import logging
|
|
import sys
|
|
import os
|
|
import re
|
|
|
|
from stripe._api_mode import ApiMode
|
|
|
|
from urllib.parse import quote_plus
|
|
|
|
from typing_extensions import Type, TYPE_CHECKING
|
|
from typing import (
|
|
TypeVar,
|
|
Union,
|
|
overload,
|
|
Dict,
|
|
List,
|
|
cast,
|
|
Any,
|
|
Optional,
|
|
Mapping,
|
|
)
|
|
|
|
|
|
# Used for global variables
|
|
import stripe # noqa: IMP101
|
|
|
|
if TYPE_CHECKING:
|
|
from stripe._stripe_response import StripeResponse
|
|
from stripe._stripe_object import StripeObject
|
|
from stripe._api_requestor import _APIRequestor
|
|
|
|
STRIPE_LOG = os.environ.get("STRIPE_LOG")
|
|
|
|
logger: logging.Logger = logging.getLogger("stripe")
|
|
|
|
|
|
def is_appengine_dev():
|
|
return "APPENGINE_RUNTIME" in os.environ and "Dev" in os.environ.get(
|
|
"SERVER_SOFTWARE", ""
|
|
)
|
|
|
|
|
|
def _console_log_level():
|
|
if stripe.log in ["debug", "info"]:
|
|
return stripe.log
|
|
elif STRIPE_LOG in ["debug", "info"]:
|
|
return STRIPE_LOG
|
|
else:
|
|
return None
|
|
|
|
|
|
def log_debug(message, **params):
|
|
msg = logfmt(dict(message=message, **params))
|
|
if _console_log_level() == "debug":
|
|
print(msg, file=sys.stderr)
|
|
logger.debug(msg)
|
|
|
|
|
|
def log_info(message, **params):
|
|
msg = logfmt(dict(message=message, **params))
|
|
if _console_log_level() in ["debug", "info"]:
|
|
print(msg, file=sys.stderr)
|
|
logger.info(msg)
|
|
|
|
|
|
def _test_or_live_environment():
|
|
if stripe.api_key is None:
|
|
return
|
|
match = re.match(r"sk_(live|test)_", stripe.api_key)
|
|
if match is None:
|
|
return
|
|
return match.groups()[0]
|
|
|
|
|
|
def dashboard_link(request_id):
|
|
return "https://dashboard.stripe.com/{env}/logs/{reqid}".format(
|
|
env=_test_or_live_environment() or "test", reqid=request_id
|
|
)
|
|
|
|
|
|
def logfmt(props):
|
|
def fmt(key, val):
|
|
# Handle case where val is a bytes or bytesarray
|
|
if hasattr(val, "decode"):
|
|
val = val.decode("utf-8")
|
|
# Check if val is already a string to avoid re-encoding into
|
|
# ascii. Since the code is sent through 2to3, we can't just
|
|
# use unicode(val, encoding='utf8') since it will be
|
|
# translated incorrectly.
|
|
if not isinstance(val, str):
|
|
val = str(val)
|
|
if re.search(r"\s", val):
|
|
val = repr(val)
|
|
# key should already be a string
|
|
if re.search(r"\s", key):
|
|
key = repr(key)
|
|
return "{key}={val}".format(key=key, val=val)
|
|
|
|
return " ".join([fmt(key, val) for key, val in sorted(props.items())])
|
|
|
|
|
|
# Borrowed from Django's source code
|
|
if hasattr(hmac, "compare_digest"):
|
|
# Prefer the stdlib implementation, when available.
|
|
def secure_compare(val1, val2):
|
|
return hmac.compare_digest(val1, val2)
|
|
|
|
else:
|
|
|
|
def secure_compare(val1, val2):
|
|
"""
|
|
Returns True if the two strings are equal, False otherwise.
|
|
The time taken is independent of the number of characters that match.
|
|
For the sake of simplicity, this function executes in constant time
|
|
only when the two strings have the same length. It short-circuits when
|
|
they have different lengths.
|
|
"""
|
|
if len(val1) != len(val2):
|
|
return False
|
|
result = 0
|
|
if isinstance(val1, bytes) and isinstance(val2, bytes):
|
|
for x, y in zip(val1, val2):
|
|
result |= x ^ y
|
|
else:
|
|
for x, y in zip(val1, val2):
|
|
result |= ord(cast(str, x)) ^ ord(cast(str, y))
|
|
return result == 0
|
|
|
|
|
|
Resp = Union["StripeResponse", Dict[str, Any], List["Resp"]]
|
|
|
|
|
|
@overload
|
|
def convert_to_stripe_object(
|
|
resp: Union["StripeResponse", Dict[str, Any]],
|
|
api_key: Optional[str] = None,
|
|
stripe_version: Optional[str] = None,
|
|
stripe_account: Optional[str] = None,
|
|
params: Optional[Mapping[str, Any]] = None,
|
|
klass_: Optional[Type["StripeObject"]] = None,
|
|
*,
|
|
api_mode: ApiMode = "V1",
|
|
) -> "StripeObject": ...
|
|
|
|
|
|
@overload
|
|
def convert_to_stripe_object(
|
|
resp: List[Resp],
|
|
api_key: Optional[str] = None,
|
|
stripe_version: Optional[str] = None,
|
|
stripe_account: Optional[str] = None,
|
|
params: Optional[Mapping[str, Any]] = None,
|
|
klass_: Optional[Type["StripeObject"]] = None,
|
|
*,
|
|
api_mode: ApiMode = "V1",
|
|
) -> List["StripeObject"]: ...
|
|
|
|
|
|
def convert_to_stripe_object(
|
|
resp: Resp,
|
|
api_key: Optional[str] = None,
|
|
stripe_version: Optional[str] = None,
|
|
stripe_account: Optional[str] = None,
|
|
params: Optional[Mapping[str, Any]] = None,
|
|
klass_: Optional[Type["StripeObject"]] = None,
|
|
*,
|
|
api_mode: ApiMode = "V1",
|
|
) -> Union["StripeObject", List["StripeObject"]]:
|
|
from stripe._api_requestor import _APIRequestor
|
|
|
|
return _convert_to_stripe_object(
|
|
resp=resp,
|
|
params=params,
|
|
klass_=klass_,
|
|
requestor=_APIRequestor._global_with_options(
|
|
api_key=api_key,
|
|
stripe_version=stripe_version,
|
|
stripe_account=stripe_account,
|
|
),
|
|
api_mode=api_mode,
|
|
)
|
|
|
|
|
|
@overload
|
|
def _convert_to_stripe_object(
|
|
*,
|
|
resp: Union["StripeResponse", Dict[str, Any]],
|
|
params: Optional[Mapping[str, Any]] = None,
|
|
klass_: Optional[Type["StripeObject"]] = None,
|
|
requestor: "_APIRequestor",
|
|
api_mode: ApiMode,
|
|
is_v2_deleted_object: bool = False,
|
|
) -> "StripeObject": ...
|
|
|
|
|
|
@overload
|
|
def _convert_to_stripe_object(
|
|
*,
|
|
resp: List[Resp],
|
|
params: Optional[Mapping[str, Any]] = None,
|
|
klass_: Optional[Type["StripeObject"]] = None,
|
|
requestor: "_APIRequestor",
|
|
api_mode: ApiMode,
|
|
is_v2_deleted_object: bool = False,
|
|
) -> List["StripeObject"]: ...
|
|
|
|
|
|
def _convert_to_stripe_object(
|
|
*,
|
|
resp: Resp,
|
|
params: Optional[Mapping[str, Any]] = None,
|
|
klass_: Optional[Type["StripeObject"]] = None,
|
|
requestor: "_APIRequestor",
|
|
api_mode: ApiMode,
|
|
# if true, we should ignore the `object` field for finding the class name. This is set by the API requestor
|
|
is_v2_deleted_object: bool = False,
|
|
) -> Union["StripeObject", List["StripeObject"]]:
|
|
# If we get a StripeResponse, we'll want to return a
|
|
# StripeObject with the last_response field filled out with
|
|
# the raw API response information
|
|
stripe_response = None
|
|
|
|
# Imports here at runtime to avoid circular dependencies
|
|
from stripe._stripe_response import StripeResponse
|
|
from stripe._stripe_object import StripeObject
|
|
|
|
if isinstance(resp, StripeResponse):
|
|
stripe_response = resp
|
|
resp = cast(Resp, stripe_response.data)
|
|
|
|
if isinstance(resp, list):
|
|
return [
|
|
_convert_to_stripe_object(
|
|
resp=cast("Union[StripeResponse, Dict[str, Any]]", i),
|
|
requestor=requestor,
|
|
api_mode=api_mode,
|
|
klass_=klass_,
|
|
)
|
|
for i in resp
|
|
]
|
|
elif isinstance(resp, dict):
|
|
resp = resp.copy()
|
|
klass_name = resp.get("object")
|
|
if isinstance(klass_name, str):
|
|
if is_v2_deleted_object:
|
|
# circular import
|
|
from stripe.v2._deleted_object import DeletedObject
|
|
|
|
klass = DeletedObject
|
|
elif api_mode == "V2" and klass_name == "v2.core.event":
|
|
from stripe.events._event_classes import get_v2_event_class
|
|
|
|
event_type = resp.get("type", "")
|
|
klass = get_v2_event_class(event_type)
|
|
else:
|
|
from stripe._object_classes import get_object_class
|
|
|
|
klass = get_object_class(api_mode, klass_name)
|
|
# TODO: this is a horrible hack. The API needs
|
|
# to return something for `object` here.
|
|
|
|
elif "data" in resp and "next_page_url" in resp:
|
|
klass = stripe.v2.ListObject
|
|
elif klass_ is not None:
|
|
klass = klass_
|
|
else:
|
|
klass = StripeObject
|
|
|
|
obj = klass._construct_from(
|
|
values=resp,
|
|
last_response=stripe_response,
|
|
requestor=requestor,
|
|
api_mode=api_mode,
|
|
)
|
|
|
|
# We only need to update _retrieve_params when special params were
|
|
# actually passed. Otherwise, leave it as is as the list / search result
|
|
# constructors will instantiate their own params.
|
|
if (
|
|
params is not None
|
|
and hasattr(obj, "object")
|
|
and (
|
|
(getattr(obj, "object") == "list")
|
|
or (getattr(obj, "object") == "search_result")
|
|
)
|
|
):
|
|
obj._retrieve_params = params
|
|
|
|
return obj
|
|
else:
|
|
return cast("StripeObject", resp)
|
|
|
|
|
|
def convert_to_dict(obj):
|
|
"""Converts a StripeObject back to a regular dict.
|
|
|
|
Nested StripeObjects are also converted back to regular dicts.
|
|
|
|
:param obj: The StripeObject to convert.
|
|
|
|
:returns: The StripeObject as a dict.
|
|
"""
|
|
from stripe._stripe_object import StripeObject
|
|
|
|
if isinstance(obj, list):
|
|
return [convert_to_dict(i) for i in obj]
|
|
elif isinstance(obj, StripeObject):
|
|
return {k: convert_to_dict(v) for k, v in obj._data.items()}
|
|
elif isinstance(obj, dict):
|
|
return {k: convert_to_dict(v) for k, v in obj.items()}
|
|
else:
|
|
return obj
|
|
|
|
|
|
@overload
|
|
def populate_headers(
|
|
idempotency_key: str,
|
|
) -> Dict[str, str]: ...
|
|
|
|
|
|
@overload
|
|
def populate_headers(idempotency_key: None) -> None: ...
|
|
|
|
|
|
def populate_headers(
|
|
idempotency_key: Union[str, None],
|
|
) -> Union[Dict[str, str], None]:
|
|
if idempotency_key is not None:
|
|
return {"Idempotency-Key": idempotency_key}
|
|
return None
|
|
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
def merge_dicts(x, y):
|
|
z = x.copy()
|
|
z.update(y)
|
|
return z
|
|
|
|
|
|
def sanitize_id(id):
|
|
quotedId = quote_plus(id)
|
|
return quotedId
|
|
|
|
|
|
def get_api_mode(url: str) -> ApiMode:
|
|
if url.startswith("/v2"):
|
|
return "V2"
|
|
|
|
# if urls aren't explicitly marked as v1, they're assumed to be v1
|
|
else:
|
|
return "V1"
|
|
|
|
|
|
class class_method_variant(object):
|
|
def __init__(self, class_method_name):
|
|
self.class_method_name = class_method_name
|
|
|
|
T = TypeVar("T")
|
|
|
|
method: Any
|
|
|
|
def __call__(self, method: T) -> T:
|
|
self.method = method
|
|
return cast(T, self)
|
|
|
|
def __get__(self, obj, objtype: Optional[Type[Any]] = None):
|
|
@functools.wraps(self.method)
|
|
def _wrapper(*args, **kwargs):
|
|
if obj is not None:
|
|
# Method was called as an instance method, e.g.
|
|
# instance.method(...)
|
|
return self.method(obj, *args, **kwargs)
|
|
elif (
|
|
len(args) > 0
|
|
and objtype is not None
|
|
and isinstance(args[0], objtype)
|
|
):
|
|
# Method was called as a class method with the instance as the
|
|
# first argument, e.g. Class.method(instance, ...) which in
|
|
# Python is the same thing as calling an instance method
|
|
return self.method(args[0], *args[1:], **kwargs)
|
|
else:
|
|
# Method was called as a class method, e.g. Class.method(...)
|
|
class_method = getattr(objtype, self.class_method_name)
|
|
return class_method(*args, **kwargs)
|
|
|
|
return _wrapper
|