USERNAME = "" PASSWORD = "" #steam ids with public profiles that own a lot of games TOP_OWNER_IDS = [76561198028121353, 76561198001237877, 76561198355625888, 76561198001678750, 76561198237402290, 76561197979911851, 76561198152618007, 76561197969050296, 76561198213148949, 76561198037867621, 76561198108581917] from stats_schema_achievement_gen import achievements_gen from controller_config_generator import parse_controller_vdf from steam.client import SteamClient from steam.client.cdn import CDNClient from steam.enums import common from steam.enums.common import EResult from steam.enums.emsg import EMsg from steam.core.msg import MsgProto import os import sys import json import urllib.request import urllib.error import threading import queue prompt_for_unavailable = True if len(sys.argv) < 2: print("\nUsage: {} appid appid appid etc..\n\nExample: {} 480\n".format(sys.argv[0], sys.argv[0])) exit(1) appids = [] for id in sys.argv[1:]: appids += [int(id)] client = SteamClient() if not os.path.exists("login_temp"): os.makedirs("login_temp") client.set_credential_location("login_temp") if (len(USERNAME) == 0 or len(PASSWORD) == 0): client.cli_login() else: result = client.login(USERNAME, password=PASSWORD) auth_code, two_factor_code = None, None while result in (EResult.AccountLogonDenied, EResult.InvalidLoginAuthCode, EResult.AccountLoginDeniedNeedTwoFactor, EResult.TwoFactorCodeMismatch, EResult.TryAnotherCM, EResult.ServiceUnavailable, EResult.InvalidPassword, ): if result == EResult.InvalidPassword: print("invalid password, the password you set is wrong.") exit(1) elif result in (EResult.AccountLogonDenied, EResult.InvalidLoginAuthCode): prompt = ("Enter email code: " if result == EResult.AccountLogonDenied else "Incorrect code. Enter email code: ") auth_code, two_factor_code = input(prompt), None elif result in (EResult.AccountLoginDeniedNeedTwoFactor, EResult.TwoFactorCodeMismatch): prompt = ("Enter 2FA code: " if result == EResult.AccountLoginDeniedNeedTwoFactor else "Incorrect code. Enter 2FA code: ") auth_code, two_factor_code = None, input(prompt) elif result in (EResult.TryAnotherCM, EResult.ServiceUnavailable): if prompt_for_unavailable and result == EResult.ServiceUnavailable: while True: answer = input("Steam is down. Keep retrying? [y/n]: ").lower() if answer in 'yn': break prompt_for_unavailable = False if answer == 'n': break client.reconnect(maxdelay=15) result = client.login(USERNAME, PASSWORD, None, auth_code, two_factor_code) def get_stats_schema(client, game_id, owner_id): message = MsgProto(EMsg.ClientGetUserStats) message.body.game_id = game_id message.body.schema_local_version = -1 message.body.crc_stats = 0 message.body.steam_id_for_user = owner_id client.send(message) return client.wait_msg(EMsg.ClientGetUserStatsResponse, timeout=5) def download_achievement_images(game_id, image_names, output_folder): q = queue.Queue() def downloader_thread(): while True: name = q.get() succeeded = False if name is None: q.task_done() return for u in ["https://cdn.akamai.steamstatic.com/steamcommunity/public/images/apps/", "https://cdn.cloudflare.steamstatic.com/steamcommunity/public/images/apps/"]: url = "{}{}/{}".format(u, game_id, name) try: with urllib.request.urlopen(url) as response: image_data = response.read() with open(os.path.join(output_folder, name), "wb") as f: f.write(image_data) succeeded = True break except urllib.error.HTTPError as e: print("HTTPError downloading", url, e.code) except urllib.error.URLError as e: print("URLError downloading", url, e.code) if not succeeded: print("error could not download", name) q.task_done() num_threads = 20 for i in range(num_threads): threading.Thread(target=downloader_thread, daemon=True).start() for name in image_names: q.put(name) q.join() for i in range(num_threads): q.put(None) q.join() def generate_achievement_stats(client, game_id, output_directory, backup_directory): achievement_images_dir = os.path.join(output_directory, "achievement_images") images_to_download = [] steam_id_list = TOP_OWNER_IDS + [client.steam_id] for x in steam_id_list: out = get_stats_schema(client, game_id, x) if out is not None: if len(out.body.schema) > 0: with open(os.path.join(backup_directory, 'UserGameStatsSchema_{}.bin'.format(appid)), 'wb') as f: f.write(out.body.schema) achievements, stats = achievements_gen.generate_stats_achievements(out.body.schema, output_directory) for ach in achievements: if "icon" in ach: images_to_download.append(ach["icon"]) if "icon_gray" in ach: images_to_download.append(ach["icon_gray"]) break else: pass # print("no schema", out) if (len(images_to_download) > 0): if not os.path.exists(achievement_images_dir): os.makedirs(achievement_images_dir) download_achievement_images(game_id, images_to_download, achievement_images_dir) def get_ugc_info(client, published_file_id): return client.send_um_and_wait('PublishedFile.GetDetails#1', { 'publishedfileids': [published_file_id], 'includetags': False, 'includeadditionalpreviews': False, 'includechildren': False, 'includekvtags': False, 'includevotes': False, 'short_description': True, 'includeforsaledata': False, 'includemetadata': False, 'language': 0 }) def download_published_file(client, published_file_id, backup_directory): ugc_info = get_ugc_info(client, published_file_id) if (ugc_info is None): print("failed getting published file", published_file_id) return None file_details = ugc_info.body.publishedfiledetails[0] if (file_details.result != EResult.OK): print("failed getting published file", published_file_id, file_details.result) return None if not os.path.exists(backup_directory): os.makedirs(backup_directory) with open(os.path.join(backup_directory, "info.txt"), "w") as f: f.write(str(ugc_info.body)) with urllib.request.urlopen(file_details.file_url) as response: data = response.read() with open(os.path.join(backup_directory, file_details.filename.replace("/", "_").replace("\\", "_")), "wb") as f: f.write(data) return data return None def get_inventory_info(client, game_id): return client.send_um_and_wait('Inventory.GetItemDefMeta#1', { 'appid': game_id }) def generate_inventory(client, game_id): inventory = get_inventory_info(client, appid) if inventory.header.eresult != EResult.OK: return None url = "https://api.steampowered.com/IGameInventory/GetItemDefArchive/v0001?appid={}&digest={}".format(game_id, inventory.body.digest) try: with urllib.request.urlopen(url) as response: return response.read() except urllib.error.HTTPError as e: print("HTTPError getting", url, e.code) except urllib.error.URLError as e: print("URLError getting", url, e.code) return None def get_dlc(raw_infos): try: try: dlc_list = set(map(lambda a: int(a), raw_infos["extended"]["listofdlc"].split(","))) except: dlc_list = set() depot_app_list = set() if "depots" in raw_infos: depots = raw_infos["depots"] for dep in depots: depot_info = depots[dep] if "dlcappid" in depot_info: dlc_list.add(int(depot_info["dlcappid"])) if "depotfromapp" in depot_info: depot_app_list.add(int(depot_info["depotfromapp"])) return (dlc_list, depot_app_list) except: print("could not get dlc infos, are there any dlcs ?") return (set(), set()) for appid in appids: backup_dir = os.path.join("backup","{}".format(appid)) out_dir = os.path.join("{}".format( "{}_output".format(appid)), "steam_settings") if not os.path.exists(backup_dir): os.makedirs(backup_dir) if not os.path.exists(out_dir): os.makedirs(out_dir) print("outputting config to", out_dir) raw = client.get_product_info(apps=[appid]) game_info = raw["apps"][appid] if "common" in game_info: game_info_common = game_info["common"] #if "community_visible_stats" in game_info_common: #NOTE: checking this seems to skip stats on a few games so it's commented out generate_achievement_stats(client, appid, out_dir, backup_dir) if "supported_languages" in game_info_common: with open(os.path.join(out_dir, "supported_languages.txt"), 'w') as f: languages = game_info_common["supported_languages"] for l in languages: if "supported" in languages[l] and languages[l]["supported"] == "true": f.write("{}\n".format(l)) with open(os.path.join(out_dir, "steam_appid.txt"), 'w') as f: f.write(str(appid)) if "depots" in game_info: if "branches" in game_info["depots"]: if "public" in game_info["depots"]["branches"]: if "buildid" in game_info["depots"]["branches"]["public"]: buildid = game_info["depots"]["branches"]["public"]["buildid"] with open(os.path.join(out_dir, "build_id.txt"), 'w') as f: f.write(str(buildid)) dlc_config_list = [] dlc_list, depot_app_list = get_dlc(game_info) dlc_infos_backup = "" if (len(dlc_list) > 0): dlc_raw = client.get_product_info(apps=dlc_list)["apps"] for dlc in dlc_raw: try: dlc_config_list.append((dlc, dlc_raw[dlc]["common"]["name"])) except: dlc_config_list.append((dlc, None)) dlc_infos_backup = json.dumps(dlc_raw, indent=4) with open(os.path.join(out_dir, "DLC.txt"), 'w', encoding="utf-8") as f: for x in dlc_config_list: if (x[1] is not None): f.write("{}={}\n".format(x[0], x[1])) config_generated = False if "config" in game_info: if "steamcontrollerconfigdetails" in game_info["config"]: controller_details = game_info["config"]["steamcontrollerconfigdetails"] for id in controller_details: details = controller_details[id] controller_type = "" enabled_branches = "" if "controller_type" in details: controller_type = details["controller_type"] if "enabled_branches" in details: enabled_branches = details["enabled_branches"] print(id, controller_type) out_vdf = download_published_file(client, int(id), os.path.join(backup_dir, controller_type + str(id))) if out_vdf is not None and not config_generated: if (controller_type in ["controller_xbox360", "controller_xboxone"] and (("default" in enabled_branches) or ("public" in enabled_branches))): parse_controller_vdf.generate_controller_config(out_vdf.decode('utf-8'), os.path.join(out_dir, "controller")) config_generated = True if "steamcontrollertouchconfigdetails" in game_info["config"]: controller_details = game_info["config"]["steamcontrollertouchconfigdetails"] for id in controller_details: details = controller_details[id] controller_type = "" enabled_branches = "" if "controller_type" in details: controller_type = details["controller_type"] if "enabled_branches" in details: enabled_branches = details["enabled_branches"] print(id, controller_type) out_vdf = download_published_file(client, int(id), os.path.join(backup_dir, controller_type + str(id))) inventory_data = generate_inventory(client, appid) if inventory_data is not None: out_inventory = {} default_items = {} inventory = json.loads(inventory_data.rstrip(b"\x00")) raw_inventory = json.dumps(inventory, indent=4) with open(os.path.join(backup_dir, "inventory.json"), "w") as f: f.write(raw_inventory) for i in inventory: index = str(i["itemdefid"]) x = {} for t in i: if i[t] is True: x[t] = "true" elif i[t] is False: x[t] = "false" else: x[t] = str(i[t]) out_inventory[index] = x default_items[index] = 1 out_json_inventory = json.dumps(out_inventory, indent=2) with open(os.path.join(out_dir, "items.json"), "w") as f: f.write(out_json_inventory) out_json_inventory = json.dumps(default_items, indent=2) with open(os.path.join(out_dir, "default_items.json"), "w") as f: f.write(out_json_inventory) game_info_backup = json.dumps(game_info, indent=4) with open(os.path.join(backup_dir, "product_info.json"), "w") as f: f.write(game_info_backup) with open(os.path.join(backup_dir, "dlc_product_info.json"), "w") as f: f.write(dlc_infos_backup)