Details for leaflet-map.ipynb

Published by gedankenstuecke

Description

Get an interactive map of your overland displays

0

Tags & Data Sources

location gps Overland connection

Comments

Please log in to comment.

Notebook
Last updated 3 years, 10 months ago

Testing interactive Maps

It requires having Overland data from https://overland.openhumans.org and currently just uses April 2020 as data source.

Notebook
Last updated 3 years, 10 months ago

Testing interactive Maps

It requires having Overland data from https://overland.openhumans.org and currently just uses April 2020 as data source.

In [1]:
import requests
import geojson
import geohash
import osm2geojson
import pandas as pd
import geopandas as gpd
import openlocationcode.openlocationcode as olc
from shapely import wkt
from geopy.geocoders import Nominatim
from ipywidgets import Text, HTML, Layout
from ipyleaflet.leaflet import ControlException
from ipyleaflet import (Map, Marker, GeoJSON, GeoData, WidgetControl,
                        FullScreenControl, LayersControl, ZoomControl, basemaps)

import re
import os
import time
import json
import urllib
import datetime
import textwrap
import threading
In [2]:
def is_urlencoded(text):
    """Is this a URL-encoded string?

    Find out by decoding and comparing to original. If they differ the
    original is encoded, else the original isn't encoded. But that still
    says nothing about whether the newly decoded version isn't still
    encoded.
    """
    return text != urllib.parse.unquote(text)
In [3]:
# provide a default identifer for a shape that can be used for demos
url = 'https://raw.githubusercontent.com/johan/world.geo.json/master/countries.geo.json'
countries = geojson.loads(requests.get(url).content)
del url
In [4]:
def bbox(coord_list):
    "Find bounding box (lower-left and upper-right points) for a list of coordinates."

    box = []
    for i in (0, 1):
        res = sorted(coord_list, key=lambda x:x[i])
        box.append((res[0][i], res[-1][i]))
    ll, ur = [box[0][0], box[1][0]], [box[0][1], box[1][1]]
    return [ll, ur]
In [5]:
def overpass(query: str, fmt='json', url=None):
    """
    Run a query on OSM Overpass API and return an XML string or a JSON/GeoJSON object.
    
    The default server/path will be ``https://overpass-api.de/api/interpreter``.
    """    
    if fmt in ['json', 'geojson']:
        data = '[out:json];' + query
    elif fmt == 'xml':
        data = '[out:xml];' + query
    else:
        raise ValueError('Format must be one of: json xml geojson')

    url = url or 'https://overpass-api.de/api/interpreter'
    params = dict(data=data)
    text = requests.get(url, params=params).text
    res = text
    if fmt == 'json':
        res = json.loads(text)
    if fmt == 'geojson':
        gj = osm2geojson.json2geojson(text)
        res = geojson.loads(json.dumps(gj))
    return res
In [6]:
def wikidata(query: str, url=None):
    """
    Run a query on Wikidata API in SparQL and return result as JSON object.
    
    As the query will be sent in one text line it is important not to have
    comments in it indicated by the # charater or it leads to a bad request.

    The default server/path will be ``https://query.wikidata.org/sparql``.
    """    
    while is_urlencoded(query):
        query = urllib.parse.unquote(query)
    url = url or 'https://query.wikidata.org/sparql'
    headers = {'User-agent': 'Mozilla/5.0'}
    params = dict(query=query, format='json')
    resp = requests.get(url, params=params, headers=headers)
    if resp.status_code >= 400:
        raise ValueError(resp.reason)
    return resp.json()
In [7]:
def find_wktdata(js):
    """
    Return the first variable name of type ``geosparql#wktLiteral`` from a SparQL JSON result.
    """
    vars = js['head']['vars']
    b0 = js['results']['bindings'][0]
    wkt_literal = 'http://www.opengis.net/ont/geosparql#wktLiteral'
    for var in vars:
        if b0[var].get('datatype', None) == wkt_literal:
            return var
    return None
In [8]:
class MapController:
    """
    A map controller executing text entries in a search field on a map.
    
    TODO: The various functions accessing external sources should become
    some kind of plugins (e.g. Geocoding, OSM, Wikidata).
    """
    def __init__(self, a_map, a_text, test_input=None, geocoder=None, globs=None):
        self.geocoder = geocoder or Nominatim(user_agent="ipymaps")
        self.globs = globs or {}
        self.a_map = a_map
        self.a_text = a_text
        self.layer_name = ''
        self.a_text.on_submit(self.text_changed)

        self.test_input = test_input or re.split('\s*\n\s*', textwrap.dedent('''
            Paris
            Buenos Aires
            52.5 13.4
            Hong Kong
            9F4MGC22+22
            https://raw.githubusercontent.com/johan/world.geo.json/master/countries.geo.json
            http://commons.wikimedia.org/data/main/Data:Canada.map
            http://overpass-api.de/api/interpreter?data=node["amenity"="post_box"](52.52, 13.35, 52.54, 13.45);
            https://query.wikidata.org/#SELECT DISTINCT ?airport ?airportLabel ?coor WHERE { ?airport wdt:P31 wd:Q1248784; ?range wd:Q31; wdt:P625 ?coor. SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } }
        '''.strip()))

        # setup status text field and controller
        self.status_tx = Text('', layout=Layout(width='500px'))
        self.status_wc = WidgetControl(widget=self.status_tx, position='bottomleft')
        
    def text_changed(self, widget):
        """
        This is called whenever the user hits enter in the search text field.
        """
        now = datetime.datetime.utcnow().isoformat()[:19]

        # commands
        if widget.value == '/test':
            self.test()
            return
        
        # locations and shapes
        res = self.run_query(widget.value)
        if type(res) == tuple:
            self.a_map.center = res
            t1 = threading.Thread(target=self.show_status, args=([str(res), 10]))
            t1.start()
            # t1.join()
        elif type(res) == gpd.geodataframe.GeoDataFrame:
            name = self.layer_name or now
            gd = GeoData(geo_dataframe=res,
                   style={'color': 'black', 'fillColor': '#3366cc', 'opacity':0.05,
                          'weight':1.9, 'dashArray':'2', 'fillOpacity':0.6},
                   hover_style={'fillColor': 'red', 'fillOpacity': 0.2},
                   name=name)
            self.a_map.add_layer(gd)
            if self.layer_name:
                self.layer_name = ''
        elif hasattr(res, '__geo_interface__'):
            data = res
            # jump to center of data
            ll, ur = bbox(list(geojson.utils.coords(data)))
            center = [(ur[0]+ll[0])/2, (ur[1]+ll[1])/2]
            self.a_map.center = list(reversed(center))
            name = self.layer_name or now
            # name = os.path.basename(widget.value)
            self.a_map.add_layer(GeoJSON(data=data, name=name))
            if self.layer_name:
                self.layer_name = ''
            
    def add_osmdata(self, query, url=None):
        """Load a GeoJSON string created from an OSM query.
        
        Before execution the queries will be slightly extended to remain
        within reasonable limits, both time and memory-wise. That means each
        query will be prefixed with ``[timeout:300][maxsize:8388608];``
        limiting the max time and memory to 300 seconds and 8 Mb of memory.
        And it will be suffixed with ``out;``.
        """
        ll, ur = self.a_map.bounds
        b = ll + ur
        query = query + 'out;'
        query = query.format(bounds=b)
        return overpass(query, fmt='geojson', url=url)
        
    def add_wikidata(self, query):
        """Make a Wikidata SparQL query and return a GeoPandas DataFrame.
        """
        js = wikidata(query)
        wkt_col = find_wktdata(js)
        columns = js['head']['vars']
        arr = [[b[var]['value'] for var in columns] for b in js['results']['bindings']]
        df = pd.DataFrame(arr, columns=columns)
        df['geometry'] = df[wkt_col].apply(wkt.loads)
        df = df.drop(labels=[wkt_col], axis=1)
        gdf = gpd.GeoDataFrame(df, geometry='geometry')
        return gdf
        
    def add_pluscode(self, query):
        "Convert a Google Pluscode (or Open Location Code) to a lat/lon."

        if olc.isValid(query):
            res = olc.decode(query)
            lat, lon = res.latlng()
            return lat, lon
        else:
            return None, None

    def add_geojson(self, url):
        "Load a GeoJSON string from the given URL, local or remote."

        # load data
        if url.startswith('file://'):
            gj = open(url[7:]).read()
        else:
            gj = requests.get(url).content
        return geojson.loads(gj)
        
    def add_geojson_map(self, url):
        "Load a map file from the given URL, local or remote."

        # load data
        if url.startswith('file://'):
            js = open(url[7:]).read()
        else:
            js = requests.get(url).text
        data = json.loads(js)['data']
        return geojson.loads(json.dumps(data))

    def show_status(self, msg, period=3):
        "Add a temporary status line widget for some time to the map."

        self.status_tx.value = msg
        try:
            self.a_map.add_control(self.status_wc)
        except ControlException:
            pass
        time.sleep(period)
        try:
            self.a_map.remove_control(self.status_wc)
        except ControlException:
            pass            

    def test(self, period=5):
        """
        Execute a sequence of test input lines.
        """
        for line in self.test_input:
            self.a_text.value = line
            self.text_changed(self.a_text)
            time.sleep(period)

    def run_query(self, text):
        """
        Run a query as given by the ``text`` string.
        
        This must return eiter a lat/lon pair or a GeoJSON object for now.
        """
        text = text.strip()
        parsed_url = urllib.parse.urlparse(text)
            
        # GeoJSON objects in passed namespace        
        if text in self.globs:
            obj = self.globs[text]
            if hasattr(obj, '__geo_interface__'):
                return obj
        
        # pluscodes
        try:
            if re.match('[A-Z0-9\+]', text):
                lat, lon = self.add_pluscode(text)
                if lat != None and lon != None:
                    return (lat, lon)
        except ValueError:
            pass

        # lat lon
        try:
            values = re.split('[/,;: ]', text)
            lat, lon = [v for v in values if v not in '/,;:']
            return tuple(list(map(float, (lat, lon))))
        except ValueError:
            pass

        # OSM Overpass Query
        if 'overpass' in parsed_url.netloc.lower():
            if parsed_url.query:
                query_dict = urllib.parse.parse_qs(parsed_url.query)
                data = query_dict.get('data', None)
                names = query_dict.get('name', None)
                if names:
                    self.layer_name = names[0]
            url = urllib.parse.urlunparse([
                parsed_url.scheme, parsed_url.netloc, parsed_url.path, '', '', ''])
            # add ; since the terminal ; is filtered away by urllib.parse.parse_qs() !
            gj = self.add_osmdata(data[0] + ';', url=url)
            return gj

        # Wikidata SparQL Query
        if 'wikidata' in parsed_url.netloc.lower():
            if parsed_url.query:
                query_dict = urllib.parse.parse_qs(parsed_url.query)
                names = query_dict.get('name', None)
                if names:
                    self.layer_name = names[0]
            gd = self.add_wikidata(parsed_url.fragment)
            return gd

        # URL pointing to some GeoJSON string
        url = text
        path = parsed_url.path.lower()
        if path.endswith('.geojson') or path.endswith('.geo.json'):
            gj = self.add_geojson(url)
            self.layer_name = os.path.basename(url)
            return gj

        # URL pointing to some .map file containing GeoJSON
        url = text
        path = parsed_url.path.lower()
        if path.endswith('.map'):
            gj = self.add_geojson_map(url)
            self.layer_name = os.path.basename(url)
            return gj

        # address to run geolocation for
        loc = self.geocoder.geocode(text)
        if loc:
            center = loc.latitude, loc.longitude
            return center
        
        # try decoding geohash value
        try:
            return geohash.decode(text)
        except ValueError:
            pass
In [9]:
import pandas
import os
from ohapi import api
In [10]:
from ipyleaflet import Heatmap, LayerGroup

bm = basemaps.OpenStreetMap['Mapnik']
m = Map(center=[50, 18], zoom=3,
        zoom_control=False,
        basemap=bm,
        layout=Layout(height='600px')
)

m.layers[0].name = bm['name']

# setup search search text field and controller
search_tx = Text('', layout=Layout(width='500px'))
wc = WidgetControl(widget=search_tx, position='topleft')
con = MapController(m, search_tx, globs=globals())

# add controls
m.add_control(wc)
m.add_control(ZoomControl(position='topleft'))
m.add_control(FullScreenControl(position='topleft'))
layers_con = LayersControl(position='topright')
m.add_control(layers_con)

locations = []
circles = []
count = 0 

member = api.exchange_oauth2_member(os.environ.get('OH_ACCESS_TOKEN'))
for f in member['data']:
    if f['basename'].startswith('overland-data-2020-04'):
        data = pd.read_csv(f['download_url'])
        for index, row in data.iterrows():
            locations.append([row['latitude'],row['longitude']])

layer_group = LayerGroup(layers=tuple(circles))
heatmap = Heatmap(locations=locations,radius=10)

m.add_layer(layer_group)
m.add_layer(heatmap)
m
In [ ]: