import json
import base64
import logging
from urllib.parse import urlencode
from http import HTTPStatus
import requests
[docs]class SpotifyApi(object):
def __init__(self):
# Load credentials
with open(".spotify-token.json") as f:
credentials = json.loads(f.read())
self._user_id = credentials['user_id']
self._client_id = credentials['client_id']
self._client_secret = credentials['client_secret']
self.redirect_uri = credentials['redirect_uri']
# Will be set after spotify auth
self._access_token = None
self._refresh_token = None
self._token_expires_in = None
self._token_type = None
def _client_credentials_authentication(self, authorization_code=None):
auth_header = base64.b64encode(
'{0}:{1}'.format(self._client_id, self._client_secret).encode('ascii')
)
response = requests.post(
url='https://accounts.spotify.com/api/token',
headers={
'Authorization': 'Basic {0}'.format(auth_header.decode('ascii'))
},
data={
'grant_type': 'authorization_code',
'code': authorization_code,
'redirect_uri': self.redirect_uri,
'scope': 'playlist-modify-public playlist-modify-private'
}
)
return response.json()
def _authorization_code_flow_authentication(self):
url = 'https://accounts.spotify.com/authorize?{0}'.format(
urlencode({
'client_id': self._client_id,
'response_type': 'code',
'redirect_uri': self.redirect_uri,
'scope': 'playlist-modify-public playlist-modify-private',
'show_dialog': False
})
)
return url
[docs] def search_track(self, track_name, artist_name):
"""Search for a track using the Spotify Search API.
Args:
track_name (str): Track name
artist_name (str): Artist name
Returns:
dict: Dict containing the song attributes
"""
response = requests.get(
'https://api.spotify.com/v1/search',
params={
'q': '{0} artist:{1}'.format(track_name, artist_name),
'type': 'track',
'limit': 1
},
headers={
'Authorization': 'Bearer {0}'.format(self._access_token)
}
)
response_json = response.json()
track_attributes = {} # init track_attributes as an empty dict
if response.status_code != 200:
logging.error(response_json["error"]["message"])
elif response_json['tracks']['total'] == 0:
logging.error("empty search")
else:
track = response_json['tracks']['items'][0]
track_attributes = {
'song_name': track['name'],
'artist_name': track['artists'][0]['name'],
'album_name': track['album']['name'],
'popularity': track['popularity'],
'duration_ms': track['duration_ms'],
'explicit': track['explicit'],
'spotify_uri': track['uri'],
'album_image': track['album']['images'][0]['url']
}
return track_attributes
[docs] def check_playlist_exists(self, playlist_id):
response = requests.get(
"https://api.spotify.com/v1/users/ericda/playlists/{0}".format(playlist_id),
headers={'Authorization': 'Bearer {0}'.format(self._access_token)},
params=dict(
fields=["id"]
)
)
return response.status_code == HTTPStatus.OK
[docs] def get_track_uris_from_playlist(self, playlist_id):
"""Return the track URIs from the playlist
Returns:
set: the songs URIs
"""
offset = 0
batch_size = 100
tracks = []
while True:
response = requests.get(
'https://api.spotify.com/v1/users/{0}/playlists/{1}/tracks'.format(
self._user_id,
playlist_id
),
headers={'Authorization': 'Bearer {0}'.format(self._access_token)},
params={
'fields': ['items.track.uri'],
'limit': batch_size,
'offset': offset
}).json()
tracks_batch = set([t['track']['uri'] for t in response['items']])
tracks += tracks_batch
if len(tracks_batch) < batch_size:
break
offset += batch_size
return tracks
[docs] def add_tracks_to_playlist(self, track_uris, playlist_id):
"""Add spotify songs to playlist, using their URIs.
Args:
track_uris (list): List of songs URIs.
Returns:
json: Reponse from the Spotify API
"""
response = requests.post(
'https://api.spotify.com/v1/users/{0}/playlists/{1}/tracks'.format(
self._user_id,
playlist_id),
headers={'Authorization': 'Bearer {0}'.format(self._access_token)},
data=json.dumps({'uris': track_uris})
)
return response.json()