Skip to content

API Reference

Main modules

Cli

fetch_recently_played(spotify_client, num_tracks=5)

Fetch recently played tracks for a user using spotipy.

Parameters:

Name Type Description Default
spotify_client Spotify

Authenticated spotipy client.

required
num_tracks int

Number of tracks to fetch (max 50). Defaults to 5.

5

Returns:

Type Description
List[Dict]

A list of track dictionaries with relevant information including

List[Dict]

name, artist, album, popularity, external_urls, preview_url,

List[Dict]

played_at timestamp, and album_cover_url.

Raises:

Type Description
Exception

If the Spotify API request fails or returns invalid data.

Source code in spoteamfy/src/cli.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def fetch_recently_played(
    spotify_client: spotipy.Spotify, num_tracks: int = 5
) -> List[Dict]:
    """Fetch recently played tracks for a user using spotipy.

    Args:
        spotify_client: Authenticated spotipy client.
        num_tracks: Number of tracks to fetch (max 50). Defaults to 5.

    Returns:
        A list of track dictionaries with relevant information including
        name, artist, album, popularity, external_urls, preview_url,
        played_at timestamp, and album_cover_url.

    Raises:
        Exception: If the Spotify API request fails or returns invalid data.
    """
    try:
        # Use spotipy's current_user_recently_played method
        results = spotify_client.current_user_recently_played(
            limit=min(num_tracks, 50)  # Spotify API limit is 50
        )

        tracks = []
        # Remove duplicates while preserving order (same song played multiple times)
        seen_tracks = set()

        for item in results["items"]:
            track = item["track"]
            track_id = track["id"]

            # Skip if we've already seen this track (to avoid duplicates)
            if track_id not in seen_tracks:
                seen_tracks.add(track_id)

                # Get album cover art (use the medium size if available)
                album_cover_url = None
                if track["album"]["images"]:
                    # Spotify provides images in descending size order
                    # Try to get a medium-sized image (around 300px)
                    for image in track["album"]["images"]:
                        if image["height"] >= 300:
                            album_cover_url = image["url"]
                        elif not album_cover_url:  # fallback to any available image
                            album_cover_url = image["url"]

                track_info = {
                    "name": track["name"],
                    "artist": ", ".join(
                        [artist["name"] for artist in track["artists"]]
                    ),
                    "album": track["album"]["name"],
                    "popularity": track["popularity"],
                    "external_urls": track["external_urls"]["spotify"],
                    "preview_url": track["preview_url"],
                    "played_at": item["played_at"],  # When it was played
                    "album_cover_url": album_cover_url,  # Album cover art
                }
                tracks.append(track_info)

                # Stop when we have enough unique tracks
                if len(tracks) >= num_tracks:
                    break

        return tracks

    except Exception as e:
        raise Exception(f"Failed to fetch recently played tracks: {e}")

format_tracks_for_teams(username, tracks)

Format track information as an adaptive card for Teams webhook.

Parameters:

Name Type Description Default
username str

Spotify username to include in the message.

required
tracks List[Dict]

List of track dictionaries to format.

required

Returns:

Type Description
Dict

A Teams message payload dictionary with an adaptive card containing

Dict

formatted track information and album cover art from the most recent track.

Dict

If no tracks are provided, returns a simple message card.

Source code in spoteamfy/src/cli.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def format_tracks_for_teams(username: str, tracks: List[Dict]) -> Dict:
    """Format track information as an adaptive card for Teams webhook.

    Args:
        username: Spotify username to include in the message.
        tracks: List of track dictionaries to format.

    Returns:
        A Teams message payload dictionary with an adaptive card containing
        formatted track information and album cover art from the most recent track.
        If no tracks are provided, returns a simple message card.
    """
    if not tracks:
        return {
            "type": "message",
            "attachments": [
                {
                    "contentType": "application/vnd.microsoft.card.adaptive",
                    "content": {
                        "type": "AdaptiveCard",
                        "version": "1.3",
                        "body": [
                            {
                                "type": "TextBlock",
                                "text": f"No recently played tracks found for {username}",  # noqa: E501
                                "weight": "Bolder",
                                "size": "Medium",
                                "color": "Attention",
                            }
                        ],
                    },
                }
            ],
        }

    # Get album cover from the most recent track (first in list)
    most_recent_track = tracks[0]
    album_cover_url = most_recent_track.get("album_cover_url")

    # Build the adaptive card body
    card_body = [
        {
            "type": "TextBlock",
            "text": f"Recently Played {len(tracks)} Tracks for {username}",
            "weight": "Bolder",
            "size": "Large",
        }
    ]

    # Add album cover if available
    if album_cover_url:
        card_body.append(
            {
                "type": "Image",
                "url": album_cover_url,
                "size": "Medium",
                "horizontalAlignment": "Center",
                "altText": f"Album cover for {most_recent_track['album']}",
            }
        )
        card_body.append(
            {
                "type": "TextBlock",
                "text": f"Album: {most_recent_track['album']}",
                "size": "Small",
                "color": "Accent",
                "horizontalAlignment": "Center",
                "spacing": "None",
            }
        )

    # Add tracks list
    tracks_container = {"type": "Container", "items": []}

    for i, track in enumerate(tracks, 1):
        track_item = {
            "type": "ColumnSet",
            "columns": [
                {
                    "type": "Column",
                    "width": "auto",
                    "items": [
                        {
                            "type": "TextBlock",
                            "text": f"{i}.",
                            "weight": "Bolder",
                            "color": "Accent",
                        }
                    ],
                },
                {
                    "type": "Column",
                    "width": "stretch",
                    "items": [
                        {
                            "type": "TextBlock",
                            "text": f"**{track['name']}**",
                            "weight": "Bolder",
                            "wrap": True,
                        },
                        {
                            "type": "TextBlock",
                            "text": f"by {track['artist']}",
                            "size": "Small",
                            "color": "Default",
                            "spacing": "None",
                            "wrap": True,
                        },
                        {
                            "type": "TextBlock",
                            "text": f"Album: {track['album']}",
                            "size": "Small",
                            "color": "Default",
                            "spacing": "None",
                            "wrap": True,
                        },
                    ],
                },
            ],
        }

        # Add Spotify link if available
        if track.get("external_urls"):
            track_item["selectAction"] = {
                "type": "Action.OpenUrl",
                "url": track["external_urls"],
            }

        tracks_container["items"].append(track_item)

        # Add separator between tracks (except for the last one)
        if i < len(tracks):
            tracks_container["items"].append(
                {
                    "type": "TextBlock",
                    "text": "",
                    "size": "Small",
                    "spacing": "Small",
                }
            )

    card_body.append(tracks_container)

    # Create the complete adaptive card message
    return {
        "type": "message",
        "attachments": [
            {
                "contentType": "application/vnd.microsoft.card.adaptive",
                "content": {
                    "type": "AdaptiveCard",
                    "version": "1.3",
                    "body": card_body,
                    "$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
                },
            }
        ],
    }

get_users_json_path(cli_path=None)

Determine the path to the users.json file.

Priority: CLI argument > .env USERS_JSON_PATH > default ./config/users.json

Parameters:

Name Type Description Default
cli_path str

Optional path provided via CLI argument.

None

Returns:

Type Description
str

The resolved path to the users.json file.

Source code in spoteamfy/src/cli.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def get_users_json_path(cli_path: str = None) -> str:
    """Determine the path to the users.json file.

    Priority: CLI argument > .env USERS_JSON_PATH > default ./config/users.json

    Args:
        cli_path: Optional path provided via CLI argument.

    Returns:
        The resolved path to the users.json file.
    """
    load_dotenv()
    if cli_path:
        return cli_path
    env_path = os.getenv("USERS_JSON_PATH")
    if env_path:
        return env_path
    return "./config/users.json"

get_webhook_url(cli_webhook=None)

Determine the webhook URL to use.

Priority: CLI argument > .env WEBHOOK_URL > raise error

Parameters:

Name Type Description Default
cli_webhook str

Optional webhook URL provided via CLI argument.

None

Returns:

Type Description
str

The resolved webhook URL.

Raises:

Type Description
ValueError

If no webhook URL is provided via CLI or environment variable.

Source code in spoteamfy/src/cli.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def get_webhook_url(cli_webhook: str = None) -> str:
    """Determine the webhook URL to use.

    Priority: CLI argument > .env WEBHOOK_URL > raise error

    Args:
        cli_webhook: Optional webhook URL provided via CLI argument.

    Returns:
        The resolved webhook URL.

    Raises:
        ValueError: If no webhook URL is provided via CLI or environment variable.
    """
    load_dotenv()
    if cli_webhook:
        return cli_webhook
    env_webhook = os.getenv("WEBHOOK_URL")
    if env_webhook:
        return env_webhook
    raise ValueError(
        "No webhook URL provided. Use --teams-webhook or set WEBHOOK_URL in .env"
    )

load_users_from_json(json_path)

Load user credentials from a JSON file.

Each user must have: username, client_id, client_secret, redirect_uri, refresh_token.

Parameters:

Name Type Description Default
json_path str

Path to the JSON file containing user credentials.

required

Returns:

Type Description
List[Dict]

A list of dictionaries containing user credential information.

Raises:

Type Description
ValueError

If a user entry is missing required keys.

FileNotFoundError

If the JSON file cannot be found.

JSONDecodeError

If the JSON file is malformed.

Source code in spoteamfy/src/cli.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def load_users_from_json(json_path: str) -> List[Dict]:
    """Load user credentials from a JSON file.

    Each user must have: username, client_id, client_secret,
    redirect_uri, refresh_token.

    Args:
        json_path: Path to the JSON file containing user credentials.

    Returns:
        A list of dictionaries containing user credential information.

    Raises:
        ValueError: If a user entry is missing required keys.
        FileNotFoundError: If the JSON file cannot be found.
        json.JSONDecodeError: If the JSON file is malformed.
    """
    with open(json_path, "r") as f:
        users = json.load(f)
    required_keys = {
        "username",
        "client_id",
        "client_secret",
        "redirect_uri",
        "refresh_token",
    }
    for user in users:
        if not required_keys.issubset(user):
            raise ValueError(f"User entry missing required keys: {user}")
    return users

main(num_tracks, users_json, teams_webhook)

CLI entry point for Spotify Teams utility.

Fetches recently played tracks from Spotify for configured users and posts formatted messages to Microsoft Teams via webhook.

Parameters:

Name Type Description Default
num_tracks int

Number of recently played tracks to fetch per user.

required
users_json Optional[str]

Path to JSON file containing user credentials.

required
teams_webhook Optional[str]

Teams webhook URL for posting messages.

required
Source code in spoteamfy/src/cli.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
@click.command()
@click.option(
    "--num-tracks",
    default=5,
    help="Number of recently played tracks to fetch per user (max 50).",
)
@click.option(
    "--users-json",
    required=False,
    type=click.Path(exists=True),
    help=(
        "Path to JSON file with user credentials. "
        "Defaults to .env USERS_JSON_PATH or ./config/users.json."
    ),
)
@click.option(
    "--teams-webhook",
    required=False,
    help="Teams webhook URL for posting track info. Defaults to .env WEBHOOK_URL.",
)
def main(
    num_tracks: int,
    users_json: Optional[str],
    teams_webhook: Optional[str],
) -> None:
    """CLI entry point for Spotify Teams utility.

    Fetches recently played tracks from Spotify for configured users
    and posts formatted messages to Microsoft Teams via webhook.

    Args:
        num_tracks: Number of recently played tracks to fetch per user.
        users_json: Path to JSON file containing user credentials.
        teams_webhook: Teams webhook URL for posting messages.
    """
    users_json_path = get_users_json_path(users_json)

    try:
        webhook_url = get_webhook_url(teams_webhook)
    except ValueError as e:
        click.echo(f"Error: {e}", err=True)
        return

    click.echo(
        f"Fetching {num_tracks} recently played tracks per user from "
        f"{users_json_path} and posting to Teams."
    )

    try:
        users = load_users_from_json(users_json_path)
        click.echo(f"Loaded user credentials for {len(users)} users.")
    except Exception as e:
        click.echo(f"Error loading users: {e}", err=True)
        return

    successful_posts = 0
    failed_posts = 0

    for user in users:
        username = user.get("username", "<unknown>")
        click.echo(f"\nProcessing user: {username}")

        try:
            # Authenticate user using spotipy
            spotify_client = authenticate_user(user)
            click.echo(f"✓ Authentication successful for {username}")

            # Fetch recently played tracks
            tracks = fetch_recently_played(spotify_client, num_tracks)
            click.echo(f"✓ Fetched {len(tracks)} recently played tracks for {username}")

            # Format message for Teams
            message = format_tracks_for_teams(username, tracks)

            # Post to Teams
            if post_to_teams(webhook_url, message):
                click.echo(f"✓ Posted tracks for {username} to Teams")
                successful_posts += 1
            else:
                click.echo(f"✗ Failed to post tracks for {username} to Teams")
                failed_posts += 1

        except SpotifyAuthError as e:
            click.echo(f"✗ Authentication failed for {username}: {e}", err=True)
            failed_posts += 1
        except Exception as e:
            click.echo(f"✗ Error processing {username}: {e}", err=True)
            failed_posts += 1

    # Summary
    click.echo("\n=== Summary ===")
    click.echo(f"Successful posts: {successful_posts}")
    click.echo(f"Failed posts: {failed_posts}")
    click.echo(f"Total users processed: {len(users)}")

post_to_teams(webhook_url, message)

Post message to Microsoft Teams using webhook.

Parameters:

Name Type Description Default
webhook_url str

Teams webhook URL to post to.

required
message Dict

Message payload dictionary to send.

required

Returns:

Type Description
bool

True if the message was posted successfully, False otherwise.

Source code in spoteamfy/src/cli.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
def post_to_teams(webhook_url: str, message: Dict) -> bool:
    """Post message to Microsoft Teams using webhook.

    Args:
        webhook_url: Teams webhook URL to post to.
        message: Message payload dictionary to send.

    Returns:
        True if the message was posted successfully, False otherwise.
    """
    try:
        response = requests.post(
            webhook_url, json=message, headers={"Content-Type": "application/json"}
        )
        response.raise_for_status()
        return True
    except requests.RequestException as e:
        click.echo(f"Failed to post to Teams: {e}", err=True)
        return False

Spotify auth

SpotifyAuthError

Bases: Exception

Custom exception for Spotify authentication errors.

Source code in spoteamfy/src/spotify_auth.py
 7
 8
 9
10
class SpotifyAuthError(Exception):
    """Custom exception for Spotify authentication errors."""

    pass

authenticate_user(user_credentials)

Authenticate a Spotify user using Spotipy and return a Spotipy client instance.

Parameters:

Name Type Description Default
user_credentials Dict[str, Any]

Dictionary containing user authentication credentials. Must include 'client_id', 'client_secret', 'redirect_uri', and 'refresh_token' keys.

required

Returns:

Type Description
Spotify

An authenticated spotipy.Spotify client instance.

Raises:

Type Description
SpotifyAuthError

If authentication fails due to invalid credentials, missing required fields, or API errors.

Source code in spoteamfy/src/spotify_auth.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def authenticate_user(user_credentials: Dict[str, Any]) -> spotipy.Spotify:
    """Authenticate a Spotify user using Spotipy and return a Spotipy client instance.

    Args:
        user_credentials: Dictionary containing user authentication credentials.
            Must include 'client_id', 'client_secret', 'redirect_uri', and
            'refresh_token' keys.

    Returns:
        An authenticated spotipy.Spotify client instance.

    Raises:
        SpotifyAuthError: If authentication fails due to invalid credentials,
            missing required fields, or API errors.
    """
    try:
        # Updated scope to include user-top-read for getting top tracks
        scope = """
            user-read-recently-played
            user-top-read
            playlist-modify-public
            playlist-modify-private
        """

        sp_oauth = SpotifyOAuth(
            client_id=user_credentials["client_id"],
            client_secret=user_credentials["client_secret"],
            redirect_uri=user_credentials["redirect_uri"],
            scope=scope,
            cache_path=None,
        )

        # Use refresh_token if available
        if "refresh_token" in user_credentials and user_credentials["refresh_token"]:
            # Check if refresh_token is not a placeholder
            if user_credentials["refresh_token"].startswith("SPOTIFY_REFRESH_TOKEN"):
                raise SpotifyAuthError(
                    "Refresh token is a placeholder. "
                    "Use get_access_token.py script to get a real refresh token."
                )

            token_info = sp_oauth.refresh_access_token(
                user_credentials["refresh_token"]
            )
            access_token = token_info["access_token"]
            return spotipy.Spotify(auth=access_token)
        else:
            raise SpotifyAuthError(
                "Missing or empty refresh_token in user credentials."
            )

    except SpotifyOauthError as e:
        raise SpotifyAuthError(
            "Authentication failed for user "
            f"{user_credentials.get('username', '')}: {e}"
        )
    except KeyError as e:
        raise SpotifyAuthError(
            "Missing required credential field for user "
            f"{user_credentials.get('username', '')}: {e}"
        )
    except Exception as e:
        raise SpotifyAuthError(
            "Unexpected authentication error for user "
            f"{user_credentials.get('username', '')}: {e}"
        )

Utility scripts

Get access token

Script to fetch Spotify access tokens using spotipy. This script handles both getting initial refresh tokens and refreshing access tokens.

get_client_credentials_token(client_id, client_secret)

Get an access token using Client Credentials Flow (app-only, no user context).

This is useful for app-only requests but won't work for user-specific data.

Parameters:

Name Type Description Default
client_id str

Spotify app client ID.

required
client_secret str

Spotify app client secret.

required

Returns:

Type Description
Optional[str]

Access token string if successful, None otherwise.

Source code in scripts/get_access_token.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def get_client_credentials_token(client_id: str, client_secret: str) -> Optional[str]:
    """Get an access token using Client Credentials Flow (app-only, no user context).

    This is useful for app-only requests but won't work for user-specific data.

    Args:
        client_id: Spotify app client ID.
        client_secret: Spotify app client secret.

    Returns:
        Access token string if successful, None otherwise.
    """
    try:
        client_credentials_manager = SpotifyClientCredentials(
            client_id=client_id, client_secret=client_secret
        )

        # sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)

        # Test the connection
        # results = sp.search(q="test", type="track", limit=1)

        print("Client Credentials token obtained successfully!")
        print(
            "Note: This token can only be used for app-only requests, "
            "not user-specific data."
        )

        return client_credentials_manager.get_access_token()

    except Exception as e:
        print(f"Error getting Client Credentials token: {e}")
        return None

get_initial_auth_for_user(username, client_id, client_secret, redirect_uri)

Get initial authorization for a user using Authorization Code Flow.

This will open a browser and require user interaction.

Parameters:

Name Type Description Default
username str

The username for display purposes.

required
client_id str

Spotify app client ID.

required
client_secret str

Spotify app client secret.

required
redirect_uri str

Redirect URI configured in Spotify app.

required

Returns:

Type Description
Optional[Dict]

Token info dictionary containing access_token, refresh_token, expires_at,

Optional[Dict]

and scope if successful, None otherwise.

Source code in scripts/get_access_token.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def get_initial_auth_for_user(
    username: str, client_id: str, client_secret: str, redirect_uri: str
) -> Optional[Dict]:
    """Get initial authorization for a user using Authorization Code Flow.

    This will open a browser and require user interaction.

    Args:
        username: The username for display purposes.
        client_id: Spotify app client ID.
        client_secret: Spotify app client secret.
        redirect_uri: Redirect URI configured in Spotify app.

    Returns:
        Token info dictionary containing access_token, refresh_token, expires_at,
        and scope if successful, None otherwise.
    """
    # Updated scope to include all required permissions
    scope = """
        user-top-read
        playlist-modify-public
        playlist-modify-private
        user-read-recently-played
    """

    # Create SpotifyOAuth object
    sp_oauth = SpotifyOAuth(
        client_id=client_id,
        client_secret=client_secret,
        redirect_uri=redirect_uri,
        scope=scope,
        show_dialog=True,  # Force show dialog even if user previously authorized
        cache_path=None,  # Don't use file-based cache
    )

    print(f"Getting authorization for user: {username}")

    # Get authorization URL
    auth_url = sp_oauth.get_authorize_url()
    print("Please go to this URL to authorize the application:")
    print(f"{auth_url}")

    # Get the redirect URL from user input
    redirect_response = input("\nPaste the full redirect URL here: ").strip()

    try:
        # Extract code from redirect URL
        code = sp_oauth.parse_response_code(redirect_response)
        if code:
            token_info = sp_oauth.get_access_token(code, as_dict=False)

            # Convert to dict format for compatibility
            if isinstance(token_info, str):
                # If it returns just the token string, we need to get the full
                # token info.
                # Use get_cached_token to get the full token info
                full_token_info = sp_oauth.get_cached_token()
                if full_token_info:
                    token_info = full_token_info
                else:
                    print("Failed to get full token info")
                    return None

            if token_info and isinstance(token_info, dict):
                print("\n=== SUCCESS ===")
                print(f"Access Token: {token_info['access_token'][:20]}...")
                print(f"Refresh Token: {token_info['refresh_token']}")
                print(f"Expires at: {token_info['expires_at']}")
                print(f"Scope: {token_info['scope']}")

                return token_info
            else:
                print("Failed to get token info")
                return None
        else:
            print("Failed to extract authorization code from URL")
            return None
    except Exception as e:
        print(f"Error during authorization: {e}")
        return None

main()

Main function to handle token operations.

Provides an interactive menu for managing Spotify API tokens including getting initial authorization, refreshing tokens, testing tokens, and obtaining client credentials tokens.

Source code in scripts/get_access_token.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def main() -> None:
    """Main function to handle token operations.

    Provides an interactive menu for managing Spotify API tokens including
    getting initial authorization, refreshing tokens, testing tokens, and
    obtaining client credentials tokens.
    """

    # Load users.json
    config_path = os.path.join(os.path.dirname(__file__), "..", "config", "users.json")

    try:
        with open(config_path, "r") as f:
            users = json.load(f)
    except FileNotFoundError:
        print(f"Error: Could not find {config_path}")
        return
    except json.JSONDecodeError as e:
        print(f"Error parsing users.json: {e}")
        return

    print("=== Spotify Token Manager ===")
    print("1. Get initial authorization (new refresh token)")
    print("2. Refresh existing access token")
    print("3. Test access token")
    print("4. Get Client Credentials token (app-only)")

    choice = input("\nSelect an option (1-4): ").strip()

    if choice in ["1", "2", "3"]:
        # List available users
        print("\nAvailable users:")
        for i, user in enumerate(users):
            print(f"{i + 1}. {user['username']}")

        try:
            selection = input("\nEnter the number of the user: ")
            user_index = int(selection) - 1
            if user_index < 0 or user_index >= len(users):
                print("Invalid selection")
                return
        except ValueError:
            print("Invalid input")
            return

        selected_user = users[user_index]
        username = selected_user["username"]
        client_id = selected_user["client_id"]
        client_secret = selected_user["client_secret"]
        redirect_uri = selected_user["redirect_uri"]
        refresh_token = selected_user.get("refresh_token", "")

        # Validate credentials
        if client_id.startswith("SPOTIFY_CLIENT_ID") or client_secret.startswith(
            "SPOTIFY_CLIENT_SECRET"
        ):
            print(
                "Error: Please update users.json with real "
                "Spotify client credentials first"
            )
            return

    if choice == "1":
        # Get initial authorization
        token_info = get_initial_auth_for_user(
            username, client_id, client_secret, redirect_uri
        )
        if token_info:
            print("\nUpdate users.json with this refresh token:")
            print(f"Replace '{refresh_token}' with '{token_info['refresh_token']}'")

    elif choice == "2":
        # Refresh existing token
        if not refresh_token or refresh_token.startswith("SPOTIFY_REFRESH_TOKEN"):
            print(
                "Error: No valid refresh token found. "
                "Use option 1 to get initial authorization first."
            )
            return

        token_info = refresh_access_token(client_id, client_secret, refresh_token)
        if token_info:
            print("You can now use this access token for API calls.")

    elif choice == "3":
        # Test access token
        access_token = input("Enter the access token to test: ").strip()
        test_access_token(access_token)

    elif choice == "4":
        # Get Client Credentials token
        if len(users) > 0:
            user = users[0]  # Use first user's credentials
            client_id = user["client_id"]
            client_secret = user["client_secret"]

            if client_id.startswith("SPOTIFY_CLIENT_ID") or client_secret.startswith(
                "SPOTIFY_CLIENT_SECRET"
            ):
                print(
                    "Error: Please update users.json with real "
                    "Spotify client credentials first"
                )
                return

            token_info = get_client_credentials_token(client_id, client_secret)
            if token_info:
                print(f"Access Token: {token_info['access_token'][:20]}...")
        else:
            print("No users found in config")

    else:
        print("Invalid choice")

refresh_access_token(client_id, client_secret, refresh_token)

Refresh an access token using a refresh token.

Parameters:

Name Type Description Default
client_id str

Spotify app client ID.

required
client_secret str

Spotify app client secret.

required
refresh_token str

Valid refresh token to use for getting new access token.

required

Returns:

Type Description
Optional[Dict]

Refreshed token info dictionary if successful, None otherwise.

Source code in scripts/get_access_token.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def refresh_access_token(
    client_id: str, client_secret: str, refresh_token: str
) -> Optional[Dict]:
    """Refresh an access token using a refresh token.

    Args:
        client_id: Spotify app client ID.
        client_secret: Spotify app client secret.
        refresh_token: Valid refresh token to use for getting new access token.

    Returns:
        Refreshed token info dictionary if successful, None otherwise.
    """
    sp_oauth = SpotifyOAuth(
        client_id=client_id,
        client_secret=client_secret,
        redirect_uri="http://127.0.0.1:8080/callback",  # Dummy URI for refresh
        cache_path=None,
    )

    try:
        # Create a token info dict with the refresh token
        # token_info = {
        #     "refresh_token": refresh_token,
        #     "access_token": "dummy",  # Will be refreshed
        #     "expires_at": 0,  # Expired, will trigger refresh
        # }

        # Refresh the token
        refreshed_token_info = sp_oauth.refresh_access_token(refresh_token)

        if refreshed_token_info:
            print("Access token refreshed successfully!")
            print(f"New Access Token: {refreshed_token_info['access_token'][:20]}...")
            print(f"Expires at: {refreshed_token_info['expires_at']}")

            return refreshed_token_info
        else:
            print("Failed to refresh access token")
            return None

    except Exception as e:
        print(f"Error refreshing token: {e}")
        return None

test_access_token(access_token)

Test if an access token works by making a simple API call.

Parameters:

Name Type Description Default
access_token str

Access token to test.

required

Returns:

Type Description
bool

True if the token is valid and works, False otherwise.

Source code in scripts/get_access_token.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def test_access_token(access_token: str) -> bool:
    """Test if an access token works by making a simple API call.

    Args:
        access_token: Access token to test.

    Returns:
        True if the token is valid and works, False otherwise.
    """
    try:
        sp = spotipy.Spotify(auth=access_token)
        user_info = sp.current_user()
        print(
            f"Token test successful! User: {user_info['display_name']}"
            f"(ID: {user_info['id']})"
        )
        return True
    except Exception as e:
        print(f"Token test failed: {e}")
        return False

Auth validator

Simple authentication validation script for Spotify users.

validate_user_auth(username)

Validate authentication for a specific user and fetch their profile info.

Parameters:

Name Type Description Default
username str

The username to validate authentication for.

required

Returns:

Type Description
bool

True if authentication is successful, False otherwise.

Source code in scripts/auth_validator.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def validate_user_auth(username: str) -> bool:
    """Validate authentication for a specific user and fetch their profile info.

    Args:
        username: The username to validate authentication for.

    Returns:
        True if authentication is successful, False otherwise.
    """

    # Load users.json
    script_dir = Path(__file__).resolve().parent.parent
    config_path = script_dir / "config" / "users.json"

    try:
        with open(config_path, "r") as f:
            users = json.load(f)
    except Exception as e:
        print(f"Failed to load users from {config_path}: {e}")
        return False

    # Find the user
    user_creds = None
    for user in users:
        if user.get("username") == username:
            user_creds = user
            break

    if not user_creds:
        print(f"User '{username}' not found in users.json")
        print(f"Available users: {[u.get('username') for u in users]}")
        return False

    print(f"Testing authentication for user: {username}")
    print(f"Client ID: {user_creds['client_id']}")
    print(f"Redirect URI: {user_creds['redirect_uri']}")

    # Check if refresh token is still a placeholder
    if user_creds["refresh_token"].startswith("SPOTIFY_REFRESH_TOKEN"):
        print(f"❌ Refresh token is still a placeholder: {user_creds['refresh_token']}")
        print(
            "Please run 'python scripts/get_access_token.py' "
            "to get a real refresh token first."
        )
        return False

    try:
        # Authenticate using spotipy
        spotify_client = authenticate_user(user_creds)
        print("✅ Authentication successful!")

        # Test by getting user profile
        user_profile = spotify_client.current_user()
        print("✅ Profile retrieved successfully!")
        print(f"   Display Name: {user_profile.get('display_name', 'N/A')}")
        print(f"   Spotify ID: {user_profile.get('id', 'N/A')}")
        print(f"   Followers: {user_profile.get('followers', {}).get('total', 'N/A')}")
        print(f"   Country: {user_profile.get('country', 'N/A')}")

        # Test fetching recently played tracks
        print("\n🎵 Testing recently played tracks retrieval...")
        recent_tracks = spotify_client.current_user_recently_played(limit=3)

        if recent_tracks["items"]:
            print("✅ Recently played tracks retrieved successfully!")
            print(f"   Found {len(recent_tracks['items'])} tracks:")
            for i, item in enumerate(recent_tracks["items"][:3], 1):
                track = item["track"]
                artists = ", ".join([artist["name"] for artist in track["artists"]])
                print(f"   {i}. {track['name']} by {artists}")
        else:
            print(
                "⚠️  No recently played tracks found "
                "(this is normal if you haven't listened to music recently)"
            )

        return True

    except SpotifyAuthError as e:
        print(f"❌ Authentication failed: {e}")
        return False
    except Exception as e:
        print(f"❌ Unexpected error: {e}")
        return False