from __future__ import absolute_import
import json
import logging
from datetime import datetime
import pkg_resources
import requests
from linode.errors import ApiError, UnexpectedResponseError
from linode.objects import *
from linode.objects.filtering import Filter
from .common import load_and_validate_keys
from .paginated_list import PaginatedList
package_version = pkg_resources.require("linode-api")[0].version
logger = logging.getLogger(__name__)
class Group:
def __init__(self, client):
self.client = client
[docs]class LinodeGroup(Group):
"""
Encapsulates Linode-related methods of the :any:`LinodeClient`. This
should not be instantiated on its own, but should instead be used through
an instance of :any:`LinodeClient`::
client = LinodeClient(token)
linodes = client.linode.get_instances() # use the LinodeGroup
This group contains all features beneath the `/linode` group in the API v4.
"""
[docs] def get_types(self, *filters):
"""
Returns a list of Linode types. These may be used to create or resize
Linodes, or simply referenced on their own. Types can be filtered to
return specific types, for example::
standard_types = client.linode.get_types(Type.class == "standard")
:param filters: Any number of filters to apply to the query.
:returns: A list of types that match the query.
:rtype: PaginatedList of Type
"""
return self.client._get_and_filter(Type, *filters)
[docs] def get_instances(self, *filters):
"""
Returns a list of Linodes on your account. You may filter this query
to return only Linodes that match specific criteria::
prod_linodes = client.linode.get_instances(Linode.group == "prod")
:param filters: Any number of filters to apply to this query.
:returns: A list of Linodes that matched the query.
:rtype: PaginatedList of Linode
"""
return self.client._get_and_filter(Linode, *filters)
[docs] def get_stackscripts(self, *filters, **kwargs):
"""
Returns a list of :any:`StackScripts<StackScript>`, both public and
private. You may filter this query to return only
:any:`StackScripts<StackScript>` that match certain criteria. You may
also request only your own private :any:`StackScripts<StackScript>`::
my_stackscripts = client.linode.get_stackscripts(mine_only=True)
:param filters: Any number of filters to apply to this query.
:param mine_only: If True, returns only private StackScripts
:type mine_only: bool
:returns: A list of StackScripts matching the query.
:rtype: PaginatedList of StackScript
"""
# python2 can't handle *args and a single keyword argument, so this is a workaround
if 'mine_only' in kwargs:
if kwargs['mine_only']:
new_filter = Filter({"mine":True})
if filters:
filters = [ f for f in filters ]
filters[0] = filters[0] & new_filter
else:
filters = [new_filter]
del kwargs['mine_only']
if kwargs:
raise TypeError("get_stackscripts() got unexpected keyword argument '{}'".format(kwargs.popitem()[0]))
return self.client._get_and_filter(StackScript, *filters)
[docs] def get_kernels(self, *filters):
"""
Returns a list of available :any:`Kernels<Kernel>`. Kernels are used
when creating or updating :any:`LinodeConfigs,LinodeConfig>`.
:param filters: Any number of filters to apply to this query.
:returns: A list of available kernels that match the query.
:rtype: PaginatedList of Kernel
"""
return self.client._get_and_filter(Kernel, *filters)
# create things
[docs] def create_instance(self, ltype, region, image=None,
authorized_keys=None, **kwargs):
"""
Creates a new Linode. This function has several modes of operation:
**Create a Linode from an Image**
To create a Linode from an :any:`Image`, call `create_instance` with
a :any:`Type`, a :any:`Region`, and an :any:`Image`. All three of
these fields may be provided as either the ID or the appropriate object.
In this mode, a root password will be generated and returned with the
new Linode object. For example::
new_linode, password = client.linode.create_instance(
"g5-standard-1",
"us-east",
image="linode/debian9")
ltype = client.linode.get_types().first()
region = client.get_regions().first()
image = client.get_images().first()
another_linode, password = client.linode.create_instance(
ltype,
region,
image=image)
**Create a Linode from StackScript**
When creating a Linode from a :any:`StackScript`, an :any:`Image` that
the StackScript support must be provided.. You must also provide any
required StackScript data for the script's User Defined Fields.. For
example, if deploying `StackScript 10079`_ (which deploys a new Linode
with a user created from keys on `github`_::
stackscript = StackScript(client, 10079)
new_linode, password = client.linode.create_instance(
"g5-standard-2",
"us-east",
image="linode/debian9",
stackscript=stackscript,
stackscript_data={"gh_username": "example"})
In the above example, "gh_username" is the name of a User Defined Field
in the chosen StackScript. For more information on StackScripts, see
the `StackScript guide`_.
.. _`StackScript 10079`: https://www.linode.com/stackscripts/view/10079
.. _`github`: https://github.com
.. _`StackScript guide`: https://www.linode.com/docs/platform/stackscripts/
**Create a Linode from a Backup**
To create a new Linode by restoring a :any:`Backup` to it, provide a
:any:`Type`, a :any:`Region`, and the :any:`Backup` to restore. You
may provide either IDs or objects for all of these fields::
existing_linode = Linode(client, 123)
snapshot = existing_linode.available_backups.snapshot.current
new_linode = client.linode.create_instance(
"g5-standard-1",
"us-east",
backup=snapshot)
**Create an empty Linode**
If you want to create an empty Linode that you will configure manually,
simply call `create_instance` with a :any:`Type` and a :any:`Region`::
empty_linode = client.linode.create_instance("g5-standard-2", "us-east")
When created this way, the Linode will not be booted and cannot boot
successfully until disks and configs are created, or it is otherwise
configured.
:param ltype: The Linode Type we are creating
:type ltype: str or LinodeType
:param region: The Region in which we are creating the Linode
:type region: str or Region
:param image: The Image to deploy to this Linode. If this is provided
and no root_pass is given, a password will be generated
and returned along with the new Linode.
:type image: str or Image
:param stackscript: The StackScript to deploy to the new Linode. If
provided, "image" is required and must be compatible
with the chosen StackScript.
:type stackscript: int or StackScript
:param stackscript_data: Values for the User Defined Fields defined in
the chosen StackScript. Does nothing if
StackScript is not provided.
:type stackscript_data: dict
:param backup: The Backup to restore to the new Linode. May not be
provided if "image" is given.
:type backup: int of Backup
:param authorized_keys: The ssh public keys to install in the linode's
/root/.ssh/authorized_keys file. Each entry may
be a single key, or a path to a file containing
the key.
:type authorized_keys: list or str
:param label: The display label for the new Linode
:type label: str
:param group: The display group for the new Linode
:type group: str
:param booted: Whether the new Linode should be booted. This will
default to True if the Linode is deployed from an Image
or Backup.
:type booted: bool
:returns: A new Linode object, or a tuple containing the new Linode and
the generated password.
:rtype: Linode or tuple(Linode, str)
:raises ApiError: If contacting the API fails
:raises UnexpectedResponseError: If the API resposne is somehow malformed.
This usually indicates that you are using
an outdated library.
"""
ret_pass = None
if image and not 'root_pass' in kwargs:
ret_pass = Linode.generate_root_password()
kwargs['root_pass'] = ret_pass
authorized_keys = load_and_validate_keys(authorized_keys)
if "stackscript" in kwargs:
# translate stackscripts
kwargs["stackscript_id"] = (kwargs["stackscript"].id if issubclass(kwargs["stackscript"])
else kwargs["stackscript"])
del kwargs["stackscript"]
if "backup" in kwargs:
# translate backups
kwargs["backup_id"] = (kwargs["backup"].id if issubclass(kwargs["backup"])
else kwargs["backup"])
del kwargs["backup"]
params = {
'type': ltype.id if issubclass(type(ltype), Base) else ltype,
'region': region.id if issubclass(type(region), Base) else region,
'image': (image.id if issubclass(type(image), Base) else image) if image else None,
'authorized_keys': authorized_keys,
}
params.update(kwargs)
result = self.client.post('/linode/instances', data=params)
if not 'id' in result:
raise UnexpectedResponseError('Unexpected response when creating linode!', json=result)
l = Linode(self.client, result['id'], result)
if not ret_pass:
return l
return l, ret_pass
[docs] def create_stackscript(self, label, script, images, desc=None, public=False, **kwargs):
"""
Creates a new :any:`StackScript` on your account.
:param label: The label for this StackScript.
:type label: str
:param script: The script to run when a :any:`Linode` is deployed with
this StackScript. Must begin with a shebang (#!).
:type script: str
:param images: A list of :any:`Images<Image>` that this StackScript
supports. Linodes will not be deployed from this
StackScript unless deployed from one of these Images.
:type images: list of Image
:param desc: A description for this StackScript.
:type desc: str
:param public: Whether this StackScript is public. Defaults to False.
Once a StackScript is made public, it may not be set
back to private.
:type public: bool
:returns: The new StackScript
:rtype: StackScript
"""
image_list = None
if type(images) is list or type(images) is PaginatedList:
image_list = [d.id if issubclass(type(d), Base) else d for d in images ]
elif type(images) is Image:
image_list = [images.id]
elif type(images) is str:
image_list = [images]
else:
raise ValueError('images must be a list of Images or a single Image')
script_body = script
if not script.startswith("#!"):
# it doesn't look like a stackscript body, let's see if it's a file
import os
if os.path.isfile(script):
with open(script) as f:
script_body = f.read()
else:
raise ValueError("script must be the script text or a path to a file")
params = {
"label": label,
"image": image_list,
"is_public": public,
"script": script_body,
"description": desc if desc else '',
}
params.update(kwargs)
result = self.client.post('/linode/stackscripts', data=params)
if not 'id' in result:
raise UnexpectedResponseError('Unexpected response when creating StackScript!', json=result)
s = StackScript(self.client, result['id'], result)
return s
[docs]class ProfileGroup(Group):
"""
Collections related to your user.
"""
[docs] def get_tokens(self, *filters):
"""
Returns the Person Access Tokens active for this user
"""
return self.client._get_and_filter(PersonalAccessToken, *filters)
[docs] def create_personal_access_token(self, label=None, expiry=None, scopes=None, **kwargs):
"""
Creates and returns a new Personal Access Token
"""
if label:
kwargs['label'] = label
if expiry:
if isinstance(expiry, datetime):
expiry = datetime.strftime(expiry, "%Y-%m-%dT%H:%M:%S")
kwargs['expiry'] = expiry
if scopes:
kwargs['scopes'] = scopes
result = self.client.post('/account/tokens', data=kwargs)
if not 'id' in result:
raise UnexpectedResponseError('Unexpected response when creating Personal Access '
'Token!', json=result)
token = PersonalAccessToken(self.client, result['id'], result)
return token
[docs] def get_apps(self, *filters):
"""
Returns the Authorized Applications for this user
"""
return self.client._get_and_filter(AuthorizedApp, *filters)
[docs]class LongviewGroup(Group):
[docs] def get_clients(self, *filters):
"""
Requests and returns a paginated list of LongviewClients on your
account.
"""
return self.client._get_and_filter(LongviewClient, *filters)
[docs] def create_client(self, label=None):
"""
Creates a new LongviewClient, optionally with a given label.
:param label: The label for the new client. If None, a default label based
on the new client's ID will be used.
:returns: A new LongviewClient
:raises ApiError: If a non-200 status code is returned
:raises UnexpectedResponseError: If the returned data from the api does
not look as expected.
"""
result = self.client.post('/longview/clients', data={
"label": label
})
if not 'id' in result:
raise UnexpectedResponseError('Unexpected response when creating Longivew '
'Client!', json=result)
c = LongviewClient(self.client, result['id'], result)
return c
[docs] def get_subscriptions(self, *filters):
"""
Requests and returns a paginated list of LongviewSubscriptions available
"""
return self.client._get_and_filter(LongviewSubscription, *filters)
[docs]class AccountGroup(Group):
def get_events(self, *filters):
return self.client._get_and_filter(Event, *filters)
[docs] def mark_last_seen_event(self, event):
"""
Marks event as the last event we have seen. If event is an int, it is treated
as an event_id, otherwise it should be an event object whose id will be used.
"""
last_seen = event if isinstance(event, int) else event.id
self.client.post('{}/seen'.format(Event.api_endpoint), model=Event(self.client, last_seen))
[docs] def get_settings(self):
"""
Resturns the account settings data for this acocunt. This is not a
listing endpoint.
"""
result = self.client.get('/account/settings')
if not 'managed' in result:
raise UnexpectedResponseError('Unexpected response when getting account settings!',
json=result)
s = AccountSettings(self.client, result['managed'], result)
return s
[docs] def get_invoices(self):
"""
Returns Invoices issued to this account
"""
return self.client._get_and_filter(Invoice)
[docs] def get_payments(self):
"""
Returns a list of Payments made to this account
"""
return self.client._get_and_filter(Payment)
[docs] def get_oauth_clients(self, *filters):
"""
Returns the OAuth Clients associated to this account
"""
return self.client._get_and_filter(OAuthClient, *filters)
[docs] def create_oauth_client(self, name, redirect_uri, **kwargs):
"""
Make a new OAuth Client and return it
"""
params = {
"label": name,
"redirect_uri": redirect_uri,
}
params.update(kwargs)
result = self.client.post('/account/oauth-clients', data=params)
if not 'id' in result:
raise UnexpectedResponseError('Unexpected response when creating OAuth Client!',
json=result)
c = OAuthClient(self.client, result['id'], result)
return c
[docs] def get_users(self, *filters):
"""
Returns a list of users on this account
"""
return self.client._get_and_filter(User, *filters)
[docs] def get_transfer(self):
"""
Returns a MappedObject containing the account's transfer pool data
"""
result = self.client.get('/account/transfer')
if not 'used' in result:
raise UnexpectedResponseError('Unexpected response when getting Transfer Pool!')
return MappedObject(**result)
def create_user(self, email, username, password, restricted=True):
"""
Creates a user
"""
params = {
"email": email,
"username": username,
"password": password,
"restricted": restricted
}
result = self.client.post('/account/users', data=params)
if not 'email' and 'restricted' and 'username' in result:
raise UnexpectedResponseError('Unexpected response when creating user!', json=result)
u = User(self.client, result['username'], result)
return u
[docs]class NetworkingGroup(Group):
def get_ips(self, *filters):
return self.client._get_and_filter(IPAddress, *filters)
def get_ipv6_ranges(self, *filters):
return self.client._get_and_filter(IPv6Range, *filters)
def get_ipv6_pools(self, *filters):
return self.client._get_and_filter(IPv6Pool, *filters)
[docs] def assign_ips(self, region, *assignments):
"""
Redistributes :any:`IP Addressees<IPAddress>` within a single region.
This function takes a :any:`Region` and a list of assignments to make,
then requests that the assignments take place. If any :any:`Linode`
ends up without a public IP, or with more than one private IP, all of
the assignments will fail.
Example usage::
linode1 = Linode(client, 123)
linode2 = Linode(client, 456)
# swap IPs between linodes 1 and 2
client.networking.assign_ips(linode1.region,
linode1.ips.ipv4.public[0].to(linode2),
linode2.ips.ipv4.public[0].to(linode1))
:param region: The Region in which the assignments should take place.
All Linodes and IPAddresses involved in the assignment
must be within this region.
:type region: str or Region
:param assignments: Any number of assignments to make. See
:any:`IPAddress.to` for details on how to construct
assignments.
:type assignments: dct
"""
for a in assignments:
if not 'address' in a or not 'linode_id' in a:
raise ValueError("Invalid assignment: {}".format(a))
if isinstance(region, Region):
region = region.id
self.client.post('/networking/ip-assign', data={
"region": region,
"assignments": [ a for a in assignments ],
})
[docs] def allocate_ip(self, linode):
"""
Allocates an IP to a Linode you own. Additional IPs must be requested
by opening a support ticket first.
:param linode: The Linode to allocate the new IP for.
:type linode: Linode or int
:returns: The new IPAddress
:rtype: IPAddress
"""
result = self.client.post('/networking/ipv4/', data={
"linode_id": linode.id if isinstance(linode, Base) else linode,
})
if not 'address' in result:
raise UnexpectedResponseError('Unexpected response when adding IPv4 address!',
json=result)
ip = IPAddress(self.client, result['address'], result)
return ip
[docs]class SupportGroup(Group):
def get_tickets(self, *filters):
return self.client._get_and_filter(SupportTicket, *filters)
[docs] def open_ticket(self, summary, description, regarding=None):
"""
"""
params = {
"summary": summary,
"description": description,
}
if regarding:
if isinstance(regarding, Linode):
params['linode_id'] = regarding.id
elif isinstance(regarding, Domain):
params['domain_id'] = regarding.id
elif isinstance(regarding, NodeBalancer):
params['nodebalancer_id'] = regarding.id
elif isinstance(regarding, Volume):
params['volume_id'] = regarding.id
else:
raise ValueError('Cannot open ticket regarding type {}!'.format(type(regarding)))
result = self.client.post('/support/tickets', data=params)
if not 'id' in result:
raise UnexpectedResponseError('Unexpected response when creating ticket!',
json=result)
t = SupportTicket(self.client, result['id'], result)
return t
[docs]class LinodeClient:
[docs] def __init__(self, token, base_url="https://api.linode.com/v4", user_agent=None):
"""
The main interface to the Linode API.
:param token: The authentication token to use for communication with the
API. Can be either a Personal Access Token or an OAuth Token.
:type token: str
:param base_url: The base URL for API requests. Generally, you shouldn't
change this.
:type base_url: str
:param user_agent: What to append to the User Agent of all requests made
by this client. Setting this allows Linode's internal
monitoring applications to track the usage of your
application. Setting this is not necessary, but some
applications may desire this behavior.
:type user_agent: str
"""
self.base_url = base_url
self._add_user_agent = user_agent
self.token = token
#: Access methods related to Linodes - see :any:`LinodeGroup` for
#: more information
self.linode = LinodeGroup(self)
#: Access methods related to your user - see :any:`ProfileGroup` for
#: more information
self.profile = ProfileGroup(self)
#: Access methods related to your account - see :any:`AccountGroup` for
#: more information
self.account = AccountGroup(self)
#: Access methods related to networking on your account - see
#: :any:`NetworkingGroup` for more information
self.networking = NetworkingGroup(self)
#: Access methods related to support - see :any:`SupportGroup` for more
#: information
self.support = SupportGroup(self)
#: Access information related to the Longview service - see
#: :any:`LongviewGroup` for more inforamtion
self.longview = LongviewGroup(self)
@property
def _user_agent(self):
return '{}python-linode-api/{} {}'.format(
'{} '.format(self._add_user_agent) if self._add_user_agent else '',
package_version,
requests.utils.default_user_agent()
)
[docs] def load(self, target_type, target_id, target_parent_id=None):
"""
Constructs and immediately loads the object, circumventing the
lazy-loading scheme by immediately making an API request. Does not
load related objects.
For example, if you wanted to load a :any:`Linode` object with ID 123,
you could do this::
loaded_linode = client.load(Linode, 123)
Similarly, if you instead wanted to load a :any:`NodeBalancerConfig`,
you could do so like this::
loaded_nodebalancer_config = client.load(NodeBalancerConfig, 456, 432)
:param target_type: The type of object to create.
:type target_type: type
:param target_id: The ID of the object to create.
:type target_id: int or str
:param target_parent_id: The parent ID of the object to create, if
applicable.
:type target_parent_id: int, str, or None
:returns: The resulting object, fully loaded.
:rtype: target_type
:raise ApiError: if the requested object could not be loaded.
"""
result = target_type.make_instance(target_id, self, parent_id=target_parent_id)
result._api_get()
return result
def _api_call(self, endpoint, model=None, method=None, data=None, filters=None):
"""
Makes a call to the linode api. Data should only be given if the method is
POST or PUT, and should be a dictionary
"""
if not self.token:
raise RuntimeError("You do not have an API token!")
if not method:
raise ValueError("Method is required for API calls!")
if model:
endpoint = endpoint.format(**vars(model))
url = '{}{}'.format(self.base_url, endpoint)
headers = {
'Authorization': "Bearer {}".format(self.token),
'Content-Type': 'application/json',
'User-Agent': self._user_agent,
}
if filters:
headers['X-Filter'] = json.dumps(filters)
body = None
if data is not None:
body = json.dumps(data)
response = method(url, headers=headers, data=body)
warning = response.headers.get('Warning', None)
if warning:
logger.warning('Received warning from server: {}'.format(warning))
if 399 < response.status_code < 600:
j = None
error_msg = '{}: '.format(response.status_code)
try:
j = response.json()
if 'errors' in j.keys():
for e in j['errors']:
error_msg += '{}; '.format(e['reason']) \
if 'reason' in e.keys() else ''
except:
pass
raise ApiError(error_msg, status=response.status_code, json=j)
if response.status_code != 204:
j = response.json()
else:
j = None # handle no response body
return j
def _get_objects(self, endpoint, cls, model=None, parent_id=None, filters=None):
response_json = self.get(endpoint, model=model, filters=filters)
if not "data" in response_json:
raise UnexpectedResponseError("Problem with response!", json=response_json)
if 'pages' in response_json:
formatted_endpoint = endpoint
if model:
formatted_endpoint = formatted_endpoint.format(**vars(model))
return PaginatedList.make_paginated_list(response_json, self, cls,
parent_id=parent_id, page_url=formatted_endpoint[1:],
filters=filters)
return PaginatedList.make_list(response_json["data"], self, cls,
parent_id=parent_id)
def get(self, *args, **kwargs):
return self._api_call(*args, method=requests.get, **kwargs)
def post(self, *args, **kwargs):
return self._api_call(*args, method=requests.post, **kwargs)
def put(self, *args, **kwargs):
return self._api_call(*args, method=requests.put, **kwargs)
def delete(self, *args, **kwargs):
return self._api_call(*args, method=requests.delete, **kwargs)
# ungrouped list functions
[docs] def get_regions(self, *filters):
"""
Returns the available Regions for Linode products.
:param filters: Any number of filters to apply to the query.
:returns: A list of available Regions.
:rtype: PaginatedList of Region
"""
return self._get_and_filter(Region, *filters)
[docs] def get_profile(self):
"""
Retrieve the acting user's Profile, containing information about the
current user such as their email address, username, and uid.
:returns: The acting user's profile.
:rtype: Profile
"""
result = self.get('/profile')
if not 'username' in result:
raise UnexpectedResponseError('Unexpected response when getting profile!', json=result)
p = Profile(self, result['username'], result)
return p
[docs] def get_account(self):
"""
Retrieves information about the acting user's account, such as billing
information.
:returns: Returns the acting user's account information.
:rtype: Account
"""
result = self.get('/account')
if not 'email' in result:
raise UnexpectedResponseError('Unexpected response when getting account!', json=result)
return Account(self, result['email'], result)
[docs] def get_images(self, *filters):
"""
Retrieves a list of available Images, including public and private
Images available to the acting user. You can filter this query to
retrieve only Images relevant to a specific query, for example::
debian_images = client.get_images(
Image.vendor == "debain")
:param filters: Any number of filters to apply to the query.
:returns: A list of available Images.
:rtype: PaginatedList of Image
"""
return self._get_and_filter(Image, *filters)
[docs] def create_image(self, disk, label=None, description=None):
"""
Creates a new Image from a disk you own.
:param disk: The Disk to imagize.
:type disk: Disk or int
:param label: The label for the resulting Image (defaults to the disk's
label.
:type label: str
:param description: The description for the new Image.
:type description: str
:returns: The new Image.
:rtype: Image
"""
params = {
"disk_id": disk.id if issubclass(type(disk), Base) else disk,
}
if label is not None:
params["label"] = label
if description is not None:
params["description"] = description
result = self.post('/images', data=params)
if not 'id' in result:
raise UnexpectedResponseError('Unexpected response when creating an '
'Image from disk {}'.format(disk))
return Image(self, result['id'], result)
[docs] def get_domains(self, *filters):
"""
Retrieves all of the Domains the acting user has access to.
:param filters: Any number of filters to apply to this query.
:returns: A list of Domains the acting user can access.
:rtype: PaginatedList of Domain
"""
return self._get_and_filter(Domain, *filters)
[docs] def get_nodebalancers(self, *filters):
"""
Retrieves all of the NodeBalancers the acting user has access to.
:param filters: Any number of filters to apply to this query.
:returns: A list of NodeBalancers the acting user can access.
:rtype: PaginatedList of NodeBalancers
"""
return self._get_and_filter(NodeBalancer, *filters)
[docs] def create_nodebalancer(self, region, **kwargs):
"""
Creates a new NodeBalancer in the given Region.
:param region: The Region in which to create the NodeBalancer.
:type region: Region or str
:returns: The new NodeBalancer
:rtype: NodeBalancer
"""
params = {
"region": region.id if isinstance(region, Base) else region,
}
params.update(kwargs)
result = self.post('/nodebalancers', data=params)
if not 'id' in result:
raise UnexpectedResponseError('Unexpected response when creating Nodebalaner!', json=result)
n = NodeBalancer(self, result['id'], result)
return n
[docs] def create_domain(self, domain, master=True, **kwargs):
"""
Registers a new Domain on the acting user's account. Make sure to point
your registrar to Linode's nameservers so that Linode's DNS manager will
correctly serve your domain.
:param domain: The domain to register to Linode's DNS manager.
:type domain: str
:param master: Whether this is a master (defaults to true)
:type master: bool
:returns: The new Domain object.
:rtype: Domain
"""
params = {
'domain': domain,
'type': 'master' if master else 'slave',
}
params.update(kwargs)
result = self.post('/domains', data=params)
if not 'id' in result:
raise UnexpectedResponseError('Unexpected response when creating Domain!', json=result)
d = Domain(self, result['id'], result)
return d
[docs] def get_volumes(self, *filters):
"""
Retrieves the Block Storage Volumes your user has access to.
:param filters: Any number of filters to apply to this query.
:returns: A list of Volumes the acting user can access.
:rtype: PaginatedList of Volume
"""
return self._get_and_filter(Volume, *filters)
[docs] def create_volume(self, label, region=None, linode=None, size=20, **kwargs):
"""
Creates a new Block Storage Volume, either in the given Region or
attached to the given Linode.
:param label: The label for the new Volume.
:type label: str
:param region: The Region to create this Volume in. Not required if
`linode` is provided.
:type region: Region or str
:param linode: The Linode to attach this Volume to. If not given, the
new Volume will not be attached to anything.
:type linode: Linode or int
:param size: The size, in GB, of the new Volume. Defaults to 20.
:type size: int
:returns: The new Volume.
:rtype: Volume
"""
if not (region or linode):
raise ValueError('region or linode required!')
params = {
"label": label,
"size": size,
"region": region.id if issubclass(type(region), Base) else region,
"linode_id": linode.id if issubclass(type(linode), Base) else linode,
}
params.update(kwargs)
result = self.post('/volumes', data=params)
if not 'id' in result:
raise UnexpectedResponseError('Unexpected response when creating volume!', json=result)
v = Volume(self, result['id'], result)
return v
# helper functions
def _filter_list(self, results, **filter_by):
if not results or not len(results):
return results
if not filter_by or not len(filter_by):
return results
for key in filter_by.keys():
if not key in vars(results[0]):
raise ValueError("Cannot filter {} by {}".format(type(results[0]), key))
if isinstance(vars(results[0])[key], Base) and isinstance(filter_by[key], Base):
results = [ r for r in results if vars(r)[key].id == filter_by[key].id ]
elif isinstance(vars(results[0])[key], str) and isinstance(filter_by[key], str):
results = [ r for r in results if filter_by[key].lower() in vars(r)[key].lower() ]
else:
results = [ r for r in results if vars(r)[key] == filter_by[key] ]
return results
def _get_and_filter(self, obj_type, *filters):
parsed_filters = None
if filters:
if(len(filters) > 1):
parsed_filters = and_(*filters).dct # pylint: disable=no-value-for-parameter
else:
parsed_filters = filters[0].dct
return self._get_objects(obj_type.api_list(), obj_type, filters=parsed_filters)