#!/usr/bin/python # -*- coding: utf-8 -*- import flask from flask import Flask, render_template from flask_googlemaps import GoogleMaps from flask_googlemaps import Map from flask_googlemaps import icons import os import re import sys import struct import json import requests import argparse import getpass import threading import werkzeug.serving import pokemon_pb2 import time from google.protobuf.internal import encoder from google.protobuf.message import DecodeError from s2sphere import * from datetime import datetime from geopy.geocoders import GoogleV3 from gpsoauth import perform_master_login, perform_oauth from geopy.exc import GeocoderTimedOut, GeocoderServiceError from requests.packages.urllib3.exceptions import InsecureRequestWarning from requests.adapters import ConnectionError from requests.models import InvalidURL from transform import * requests.packages.urllib3.disable_warnings(InsecureRequestWarning) API_URL = 'https://pgorelease.nianticlabs.com/plfe/rpc' LOGIN_URL = \ 'https://sso.pokemon.com/sso/login?service=https://sso.pokemon.com/sso/oauth2.0/callbackAuthorize' LOGIN_OAUTH = 'https://sso.pokemon.com/sso/oauth2.0/accessToken' APP = 'com.nianticlabs.pokemongo' with open('credentials.json') as file: credentials = json.load(file) PTC_CLIENT_SECRET = credentials.get('ptc_client_secret', None) ANDROID_ID = credentials.get('android_id', None) SERVICE = credentials.get('service', None) CLIENT_SIG = credentials.get('client_sig', None) GOOGLEMAPS_KEY = credentials.get('gmaps_key', None) SESSION = requests.session() SESSION.headers.update({'User-Agent': 'Niantic App'}) SESSION.verify = False global_password = None global_token = None access_token = None DEBUG = True VERBOSE_DEBUG = False # if you want to write raw request/response to the console COORDS_LATITUDE = 0 COORDS_LONGITUDE = 0 COORDS_ALTITUDE = 0 FLOAT_LAT = 0 FLOAT_LONG = 0 NEXT_LAT = 0 NEXT_LONG = 0 auto_refresh = 0 default_step = 0.001 api_endpoint = None pokemons = {} gyms = {} pokestops = {} numbertoteam = { # At least I'm pretty sure that's it. I could be wrong and then I'd be displaying the wrong owner team of gyms. 0: 'Gym', 1: 'Mystic', 2: 'Valor', 3: 'Instinct', } origin_lat, origin_lon = None, None is_ampm_clock = False # stuff for in-background search thread search_thread = None def memoize(obj): cache = obj.cache = {} @functools.wraps(obj) def memoizer(*args, **kwargs): key = str(args) + str(kwargs) if key not in cache: cache[key] = obj(*args, **kwargs) return cache[key] return memoizer def parse_unicode(bytestring): decoded_string = bytestring.decode(sys.getfilesystemencoding()) return decoded_string def debug(message): if DEBUG: print '[-] {}'.format(message) def time_left(ms): s = ms / 1000 (m, s) = divmod(s, 60) (h, m) = divmod(m, 60) return (h, m, s) def encode(cellid): output = [] encoder._VarintEncoder()(output.append, cellid) return ''.join(output) def getNeighbors(): origin = CellId.from_lat_lng(LatLng.from_degrees(FLOAT_LAT, FLOAT_LONG)).parent(15) walk = [origin.id()] # 10 before and 10 after next = origin.next() prev = origin.prev() for i in range(10): walk.append(prev.id()) walk.append(next.id()) next = next.next() prev = prev.prev() return walk def f2i(float): return struct.unpack('= 1: req1.MergeFrom(reqq[0]) req2 = req.requests.add() req2.type = 126 if len(reqq) >= 2: req2.MergeFrom(reqq[1]) req3 = req.requests.add() req3.type = 4 if len(reqq) >= 3: req3.MergeFrom(reqq[2]) req4 = req.requests.add() req4.type = 129 if len(reqq) >= 4: req4.MergeFrom(reqq[3]) req5 = req.requests.add() req5.type = 5 if len(reqq) >= 5: req5.MergeFrom(reqq[4]) return retrying_api_req(service, api, access_token, req, useauth=useauth) def login_google(username, password): print '[!] Google login for: {}'.format(username) r1 = perform_master_login(username, password, ANDROID_ID) r2 = perform_oauth(username, r1.get('Token', ''), ANDROID_ID, SERVICE, APP, CLIENT_SIG, ) return r2.get('Auth') def login_ptc(username, password): print '[!] PTC login for: {}'.format(username) head = {'User-Agent': 'Niantic App'} r = SESSION.get(LOGIN_URL, headers=head) if r is None: return render_template('nope.html', fullmap=fullmap) try: jdata = json.loads(r.content) except ValueError, e: debug('login_ptc: could not decode JSON from {}'.format(r.content)) return None # Maximum password length is 15 (sign in page enforces this limit, API does not) if len(password) > 15: print '[!] Trimming password to 15 characters' password = password[:15] data = { 'lt': jdata['lt'], 'execution': jdata['execution'], '_eventId': 'submit', 'username': username, 'password': password, } r1 = SESSION.post(LOGIN_URL, data=data, headers=head) ticket = None try: ticket = re.sub('.*ticket=', '', r1.history[0].headers['Location']) except Exception, e: if DEBUG: print r1.json()['errors'][0] return None data1 = { 'client_id': 'mobile-app_pokemon-go', 'redirect_uri': 'https://www.nianticlabs.com/pokemongo/error', 'client_secret': PTC_CLIENT_SECRET, 'grant_type': 'refresh_token', 'code': ticket, } r2 = SESSION.post(LOGIN_OAUTH, data=data1) access_token = re.sub('&expires.*', '', r2.content) access_token = re.sub('.*access_token=', '', access_token) return access_token def get_heartbeat(service, api_endpoint, access_token, response, ): m4 = pokemon_pb2.RequestEnvelop.Requests() m = pokemon_pb2.RequestEnvelop.MessageSingleInt() m.f1 = int(time.time() * 1000) m4.message = m.SerializeToString() m5 = pokemon_pb2.RequestEnvelop.Requests() m = pokemon_pb2.RequestEnvelop.MessageSingleString() m.bytes = '05daf51635c82611d1aac95c0b051d3ec088a930' m5.message = m.SerializeToString() walk = sorted(getNeighbors()) m1 = pokemon_pb2.RequestEnvelop.Requests() m1.type = 106 m = pokemon_pb2.RequestEnvelop.MessageQuad() m.f1 = ''.join(map(encode, walk)) m.f2 = \ "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" m.lat = COORDS_LATITUDE m.long = COORDS_LONGITUDE m1.message = m.SerializeToString() response = get_profile(service, access_token, api_endpoint, response.unknown7, m1, pokemon_pb2.RequestEnvelop.Requests(), m4, pokemon_pb2.RequestEnvelop.Requests(), m5, ) try: payload = response.payload[0] except (AttributeError, IndexError): return heartbeat = pokemon_pb2.ResponseEnvelop.HeartbeatPayload() heartbeat.ParseFromString(payload) return heartbeat def get_token(service, username, password): """ Get token if it's not None :return: :rtype: """ global global_token if global_token is None: if service == 'ptc': global_token = login_ptc(username, password) else: global_token = login_google(username, password) return global_token else: return global_token def get_args(): parser = argparse.ArgumentParser() parser.add_argument( '-a', '--auth_service', type=str.lower, help='Auth Service', default='ptc') parser.add_argument('-u', '--username', help='Username', required=True) parser.add_argument('-p', '--password', help='Password', required=False) parser.add_argument( '-l', '--location', type=parse_unicode, help='Location', required=True) parser.add_argument('-st', '--step-limit', help='Steps', required=True) group = parser.add_mutually_exclusive_group(required=False) group.add_argument( '-i', '--ignore', help='Comma-separated list of Pokémon names to ignore') group.add_argument( '-o', '--only', help='Comma-separated list of Pokémon names to search') parser.add_argument( "-ar", "--auto_refresh", help="Enables an autorefresh that behaves the same as a page reload. " + "Needs an integer value for the amount of seconds") parser.add_argument( '-dp', '--display-pokestop', help='Display pokéstop', action='store_true', default=False) parser.add_argument( '-dg', '--display-gym', help='Display Gym', action='store_true', default=False) parser.add_argument( '-H', '--host', help='Set web server listening host', default='127.0.0.1') parser.add_argument( '-P', '--port', type=int, help='Set web server listening port', default=5000) parser.add_argument( "-L", "--locale", help="Locale for Pokemon names: default en, check locale folder for more options", default="en") parser.add_argument( "-ol", "--onlylure", help='Display only lured pokéstop', action='store_true') parser.add_argument( '-c', '--china', help='Coordinates transformer for China', action='store_true') parser.add_argument( "-pm", "--ampm_clock", help="Toggles the AM/PM clock for Pokemon timers", action='store_true', default=False) parser.add_argument( '-d', '--debug', help='Debug Mode', action='store_true') parser.set_defaults(DEBUG=True) return parser.parse_args() @memoize def login(args): global global_password if not global_password: if args.password: global_password = args.password else: global_password = getpass.getpass() access_token = get_token(args.auth_service, args.username, global_password) if access_token is None: raise Exception('[-] Wrong username/password') print '[+] RPC Session Token: {} ...'.format(access_token[:25]) api_endpoint = get_api_endpoint(args.auth_service, access_token) if api_endpoint is None: raise Exception('[-] RPC server offline') print '[+] Received API endpoint: {}'.format(api_endpoint) profile_response = retrying_get_profile(args.auth_service, access_token, api_endpoint, None) if profile_response is None or not profile_response.payload: raise Exception('Could not get profile') print '[+] Login successful' payload = profile_response.payload[0] profile = pokemon_pb2.ResponseEnvelop.ProfilePayload() profile.ParseFromString(payload) print '[+] Username: {}'.format(profile.profile.username) creation_time = \ datetime.fromtimestamp(int(profile.profile.creation_time) / 1000) print '[+] You started playing Pokemon Go on: {}'.format( creation_time.strftime('%Y-%m-%d %H:%M:%S')) for curr in profile.profile.currency: print '[+] {}: {}'.format(curr.type, curr.amount) return api_endpoint, access_token, profile_response def main(): full_path = os.path.realpath(__file__) (path, filename) = os.path.split(full_path) args = get_args() if args.auth_service not in ['ptc', 'google']: print '[!] Invalid Auth service specified' return print('[+] Locale is ' + args.locale) pokemonsJSON = json.load( open(path + '/locales/pokemon.' + args.locale + '.json')) if args.debug: global DEBUG DEBUG = True print '[!] DEBUG mode on' # only get location for first run if not (FLOAT_LAT and FLOAT_LONG): print('[+] Getting initial location') retrying_set_location(args.location) if args.auto_refresh: global auto_refresh auto_refresh = int(args.auto_refresh) * 1000 if args.ampm_clock: global is_ampm_clock is_ampm_clock = True api_endpoint, access_token, profile_response = login(args) clear_stale_pokemons() steplimit = int(args.step_limit) ignore = [] only = [] if args.ignore: ignore = [i.lower().strip() for i in args.ignore.split(',')] elif args.only: only = [i.lower().strip() for i in args.only.split(',')] pos = 1 x = 0 y = 0 dx = 0 dy = -1 steplimit2 = steplimit**2 for step in range(steplimit2): #starting at 0 index debug('looping: step {} of {}'.format((step+1), steplimit**2)) #debug('steplimit: {} x: {} y: {} pos: {} dx: {} dy {}'.format(steplimit2, x, y, pos, dx, dy)) # Scan location math if -steplimit2 / 2 < x <= steplimit2 / 2 and -steplimit2 / 2 < y <= steplimit2 / 2: set_location_coords(x * 0.0025 + origin_lat, y * 0.0025 + origin_lon, 0) if x == y or x < 0 and x == -y or x > 0 and x == 1 - y: (dx, dy) = (-dy, dx) (x, y) = (x + dx, y + dy) process_step(args, api_endpoint, access_token, profile_response, pokemonsJSON, ignore, only) print('Completed: ' + str( ((step+1) + pos * .25 - .25) / (steplimit2) * 100) + '%') global NEXT_LAT, NEXT_LONG if (NEXT_LAT and NEXT_LONG and (NEXT_LAT != FLOAT_LAT or NEXT_LONG != FLOAT_LONG)): print('Update to next location %f, %f' % (NEXT_LAT, NEXT_LONG)) set_location_coords(NEXT_LAT, NEXT_LONG, 0) NEXT_LAT = 0 NEXT_LONG = 0 else: set_location_coords(origin_lat, origin_lon, 0) register_background_thread() def process_step(args, api_endpoint, access_token, profile_response, pokemonsJSON, ignore, only): print('[+] Searching for Pokémon at location {} {}'.format(FLOAT_LAT, FLOAT_LONG)) origin = LatLng.from_degrees(FLOAT_LAT, FLOAT_LONG) step_lat = FLOAT_LAT step_long = FLOAT_LONG parent = CellId.from_lat_lng(LatLng.from_degrees(FLOAT_LAT, FLOAT_LONG)).parent(15) h = get_heartbeat(args.auth_service, api_endpoint, access_token, profile_response) hs = [h] seen = set([]) for child in parent.children(): latlng = LatLng.from_point(Cell(child).get_center()) set_location_coords(latlng.lat().degrees, latlng.lng().degrees, 0) hs.append( get_heartbeat(args.auth_service, api_endpoint, access_token, profile_response)) set_location_coords(step_lat, step_long, 0) visible = [] for hh in hs: try: for cell in hh.cells: for wild in cell.WildPokemon: hash = wild.SpawnPointId + ':' \ + str(wild.pokemon.PokemonId) if hash not in seen: visible.append(wild) seen.add(hash) if cell.Fort: for Fort in cell.Fort: if Fort.Enabled == True: if args.china: (Fort.Latitude, Fort.Longitude) = \ transform_from_wgs_to_gcj(Location(Fort.Latitude, Fort.Longitude)) if Fort.GymPoints and args.display_gym: gyms[Fort.FortId] = [Fort.Team, Fort.Latitude, Fort.Longitude, Fort.GymPoints] elif Fort.FortType \ and args.display_pokestop: expire_time = 0 if Fort.LureInfo.LureExpiresTimestampMs: expire_time = datetime\ .fromtimestamp(Fort.LureInfo.LureExpiresTimestampMs / 1000.0)\ .strftime("%H:%M:%S") if (expire_time != 0 or not args.onlylure): pokestops[Fort.FortId] = [Fort.Latitude, Fort.Longitude, expire_time] except AttributeError: break for poke in visible: pokename = pokemonsJSON[str(poke.pokemon.PokemonId)] if args.ignore: if pokename.lower() in ignore: continue elif args.only: if pokename.lower() not in only: continue disappear_timestamp = time.time() + poke.TimeTillHiddenMs \ / 1000 if args.china: (poke.Latitude, poke.Longitude) = \ transform_from_wgs_to_gcj(Location(poke.Latitude, poke.Longitude)) pokemons[poke.SpawnPointId] = { "lat": poke.Latitude, "lng": poke.Longitude, "disappear_time": disappear_timestamp, "id": poke.pokemon.PokemonId, "name": pokename } def clear_stale_pokemons(): current_time = time.time() for pokemon_key in pokemons.keys(): pokemon = pokemons[pokemon_key] if current_time > pokemon['disappear_time']: print "[+] removing stale pokemon %s at %f, %f from list" % ( pokemon['name'].encode('utf-8'), pokemon['lat'], pokemon['lng']) del pokemons[pokemon_key] def register_background_thread(initial_registration=False): """ Start a background thread to search for Pokemon while Flask is still able to serve requests for the map :param initial_registration: True if first registration and thread should start immediately, False if it's being called by the finishing thread to schedule a refresh :return: None """ debug('register_background_thread called') global search_thread if initial_registration: if not werkzeug.serving.is_running_from_reloader(): debug( 'register_background_thread: not running inside Flask so not starting thread') return if search_thread: debug( 'register_background_thread: initial registration requested but thread already running') return debug('register_background_thread: initial registration') search_thread = threading.Thread(target=main) else: debug('register_background_thread: queueing') search_thread = threading.Timer(30, main) # delay, in seconds search_thread.daemon = True search_thread.name = 'search_thread' search_thread.start() def create_app(): app = Flask(__name__, template_folder='templates') GoogleMaps(app, key=GOOGLEMAPS_KEY) return app app = create_app() @app.route('/data') def data(): """ Gets all the PokeMarkers via REST """ return json.dumps(get_pokemarkers()) @app.route('/raw_data') def raw_data(): """ Gets raw data for pokemons/gyms/pokestops via REST """ return flask.jsonify(pokemons=pokemons, gyms=gyms, pokestops=pokestops) @app.route('/config') def config(): """ Gets the settings for the Google Maps via REST""" center = { 'lat': FLOAT_LAT, 'lng': FLOAT_LONG, 'zoom': 15, 'identifier': "fullmap" } return json.dumps(center) @app.route('/') def fullmap(): clear_stale_pokemons() return render_template( 'example_fullmap.html', key=GOOGLEMAPS_KEY, fullmap=get_map(), auto_refresh=auto_refresh) @app.route('/next_loc') def next_loc(): global NEXT_LAT, NEXT_LONG lat = flask.request.args.get('lat', '') lon = flask.request.args.get('lon', '') if not (lat and lon): print('[-] Invalid next location: %s,%s' % (lat, lon)) else: print('[+] Saved next location as %s,%s' % (lat, lon)) NEXT_LAT = float(lat) NEXT_LONG = float(lon) return 'ok' def get_pokemarkers(): pokeMarkers = [{ 'icon': icons.dots.red, 'lat': origin_lat, 'lng': origin_lon, 'infobox': "Start position", 'type': 'custom', 'key': 'start-position', 'disappear_time': -1 }] for pokemon_key in pokemons: pokemon = pokemons[pokemon_key] datestr = datetime.fromtimestamp(pokemon[ 'disappear_time']) dateoutput = datestr.strftime("%H:%M:%S") if is_ampm_clock: dateoutput = datestr.strftime("%I:%M%p").lstrip('0') pokemon['disappear_time_formatted'] = dateoutput LABEL_TMPL = u'''
{name} - #{id}
Disappears at - {disappear_time_formatted}
Get Directions
''' label = LABEL_TMPL.format(**pokemon) # NOTE: `infobox` field doesn't render multiple line string in frontend label = label.replace('\n', '') pokeMarkers.append({ 'type': 'pokemon', 'key': pokemon_key, 'disappear_time': pokemon['disappear_time'], 'icon': 'static/icons/%d.png' % pokemon["id"], 'lat': pokemon["lat"], 'lng': pokemon["lng"], 'infobox': label }) for gym_key in gyms: gym = gyms[gym_key] if gym[0] == 0: color = "rgba(0,0,0,.4)" if gym[0] == 1: color = "rgba(74, 138, 202, .6)" if gym[0] == 2: color = "rgba(240, 68, 58, .6)" if gym[0] == 3: color = "rgba(254, 217, 40, .6)" icon = 'static/forts/'+numbertoteam[gym[0]]+'_large.png' pokeMarkers.append({ 'icon': 'static/forts/' + numbertoteam[gym[0]] + '.png', 'type': 'gym', 'key': gym_key, 'disappear_time': -1, 'lat': gym[1], 'lng': gym[2], 'infobox': "
Gym owned by:
Team " + numbertoteam[gym[0]] + "

Prestige: " + str(gym[3]) + "
" }) for stop_key in pokestops: stop = pokestops[stop_key] if stop[2] > 0: pokeMarkers.append({ 'type': 'lured_stop', 'key': stop_key, 'disappear_time': -1, 'icon': 'static/forts/PstopLured.png', 'lat': stop[0], 'lng': stop[1], 'infobox': 'Lured Pokestop, expires at ' + stop[2], }) else: pokeMarkers.append({ 'type': 'stop', 'key': stop_key, 'disappear_time': -1, 'icon': 'static/forts/Pstop.png', 'lat': stop[0], 'lng': stop[1], 'infobox': 'Pokestop', }) return pokeMarkers def get_map(): fullmap = Map( identifier="fullmap2", style='height:100%;width:100%;top:0;left:0;position:absolute;z-index:200;', lat=origin_lat, lng=origin_lon, markers=get_pokemarkers(), zoom='15', ) return fullmap if __name__ == '__main__': args = get_args() register_background_thread(initial_registration=True) app.run(debug=True, threaded=True, host=args.host, port=args.port)