#%% Read and Process NWS Alerts # Created on: 2023-01-17 import datetime # Import Modules import geopandas as gp import pandas as pd import numpy as np import xarray as xr from netCDF4 import Dataset import nwswx import requests import shapely.wkt import re import urllib.parse #%% Setup NWS ID nws = nwswx.WxAPI('api@alexanderrey.ca') #%% Read NWS Alerts # All alerts # alertsIN = nws.alerts(return_format=nwswx.formats.JSONLD) # Active alerts alertsIN = nws.active_alerts(return_format=nwswx.formats.JSONLD) nws_alerts = alertsIN['@graph'] # If more than one page of alerts, loop through an append while 'pagination' in alertsIN: # Get the next page of alerts result = requests.get(alertsIN['pagination']['next']) # Convert to JSON alertsIN = result.json() nws_alerts.extend(alertsIN['features']) print('AWS Alerts: ', len(nws_alerts)) #%% Create NWS Alerts DataFrame nws_alert_df = pd.DataFrame.from_records(nws_alerts) #%% Read in NWS Polygon shapefiles as GeoDataFrame z_shp = gp.read_file("C:/Users/arey/Downloads/z_13se22/z_13se22.shp") fz_shp = gp.read_file("C:/Users/arey/Downloads/fz13se22/fz13se22.shp") oz_shp = gp.read_file("C:/Users/arey/Downloads/oz22mr22/oz22mr22.shp") mz_shp = gp.read_file("C:/Users/arey/Downloads/mz13se22/mz13se22.shp") w_shp = gp.read_file("C:/Users/arey/Downloads/w_22mr22/w_22mr22.shp") # ba_shp = gp.read_file("C:/Users/arey/Downloads/ba12my15/ba12my15.shp") # hs_shp = gp.read_file("C:/Users/arey/Downloads/hs08mr23/hs08mr23.shp") # s_shp = gp.read_file("C:/Users/arey/Downloads/s_22mr22/s_22mr22.shp") #%% Match NWS Alerts to NWS Polygons # Create empty list nws_alert_polygons = [] nws_alert_data = [] nws_alert_debug = [] # Loop through each NWS Alert and match to NWS Polygon using UGC for i in range(len(nws_alert_df)): # If alert contains geometry as a string if type(nws_alert_df['geometry'][i]) == str: nws_alert_polygons.append(shapely.wkt.loads(nws_alert_df['geometry'][i])) nws_alert_data.append(nws_alert_df.iloc[i, :]) nws_alert_debug.append(i) # Geometry Array elif type(nws_alert_df['geometry'][i]) == gp.array.GeometryArray: nws_alert_polygons.append(nws_alert_df['geometry'][i][0]) nws_alert_data.append(nws_alert_df.iloc[i, :]) nws_alert_debug.append(i) else: # Get the UGC code for the alert UGCs = nws_alert_df['geocode'][i]['UGC'] # Loop through each UGC code and find matching polygon for ugc in UGCs: # Zone based warning if (ugc[2] == 'Z') & (ugc[0:2] in z_shp['STATE'].unique()): alert_poly = z_shp[(z_shp['STATE'] == ugc[0:2]) & (z_shp['ZONE'] == ugc[3:6])].geometry # Fire Weather Zone based warning elif (ugc[2] == 'C') & (ugc[0:2] in z_shp['STATE'].unique()): alert_poly = fz_shp[(fz_shp['STATE'] == ugc[0:2]) & (fz_shp['ZONE'] == ugc[3:6])].geometry # Marine Zone based warning elif ugc in mz_shp['ID'].unique(): alert_poly = mz_shp[mz_shp['ID'] == ugc].geometry # Offshore Zone based warning elif ugc in oz_shp['ID'].unique(): alert_poly = oz_shp[oz_shp['ID'] == ugc].geometry # elif ugc[0] == 'W': # nws_alert_df['geometry'][i] = w_shp[w_shp['UGC'] == ugc].geometry.values else: print('No polygon found for UGC: ', ugc) print('Alert: ', nws_alert_df['headline'][i]) continue # If no matches found if len(alert_poly) == 0: print('No polygon found for UGC: ', ugc) print('Alert: ', nws_alert_df['headline'][i]) continue # Append polygon to list nws_alert_polygons.append(alert_poly.values[0]) nws_alert_data.append(nws_alert_df.iloc[i, :]) nws_alert_debug.append(i) #%% Create GeoDataFrame of alert polygons nws_alert_gdf = gp.GeoDataFrame(nws_alert_data, geometry=nws_alert_polygons) nws_alert_gdf.reset_index(inplace=True) #%% Create grid of points xs = np.arange(-127, -65, 0.075) ys = np.arange(24, 50, 0.075) lons, lats = np.meshgrid(xs, ys) # Create GeoSeries of Points gridPointsSeries = gp.GeoSeries.from_xy(lons.flatten(), lats.flatten()) #%% Loop through each NWS Alert and find points within polygon alertData_1D = [None for i in range(len(gridPointsSeries))] # Loop through NWS Alerts for alertIDX in nws_alert_gdf.index: alertString = '' # Find h3 index for point # Identify which points are within an alert polygon alertPoints = np.where(nws_alert_gdf.loc[alertIDX, 'geometry'].contains(gridPointsSeries))[0] # Loop through points and add alert data for alertPoint in alertPoints: # Get existing alert data and add new alert alertString = alertData_1D[alertPoint] if alertString == None: alertString = '' alertDescription = nws_alert_gdf.loc[alertIDX, 'description'] if alertDescription == None: alertDescription = nws_alert_gdf.loc[alertIDX, 'headline'] if nws_alert_gdf.loc[alertIDX, 'ends'] == None: alertEndTime = nws_alert_gdf.loc[alertIDX, 'expires'] else: alertEndTime = nws_alert_gdf.loc[alertIDX, 'ends'] alertURL = 'https://alerts-v2.weather.gov/#/?id=' + \ urllib.parse.quote(nws_alert_gdf.loc[alertIDX, 'id']) alertString = alertString + '[' + nws_alert_gdf.loc[alertIDX, 'headline'] + '}' \ '{' + alertDescription + '}' + \ '{' + nws_alert_gdf.loc[alertIDX, 'areaDesc'] + '}' + \ '{' + nws_alert_gdf.loc[alertIDX, 'onset'] + '}' + \ '{' + alertEndTime + '}' + \ '{' + nws_alert_gdf.loc[alertIDX, 'severity'] + '}' + \ '{' + alertURL + ']' alertData_1D[alertPoint] = alertString print(alertIDX) #%% Create xarray of alert data # Convert list to numpy list_alerts_1d_np = np.array(alertData_1D) # Reshape back to 2d array list_alerts_2d_np = list_alerts_1d_np.reshape(lons.shape) # Convert to xarray grid_alerts_xr = xr.DataArray(data=list_alerts_2d_np, dims=["x", "y"], coords={"lon": (("x", "y"), lons), "lat": (("x", "y"), lats)} ) # Save as netcdf grid_alerts_xr.to_netcdf('grid_alerts_contains8.nc', engine="h5netcdf", encoding={"__xarray_dataarray_variable__": {'gzip': True, "compression_opts": 9}}) #%% Read in netcdf grid_alerts_nc = Dataset('grid_alerts_contains5.nc') grid_alert_pt = grid_alerts_nc['__xarray_dataarray_variable__'][176, 300] print(grid_alerts_nc['lon'][176, 300]) print(grid_alerts_nc['lat'][176, 300]) #%% Process alert data string alertPattern = '\[([^]]+)\]' alertList = re.findall(alertPattern, grid_alert_pt) # Loop through each alert for alert in alertList: # Extract alert details alertDetails = alert.split('}{') print(alertDetails) alertTime = datetime.datetime.strptime(alertDetails[3], '%Y-%m-%dT%H:%M:%S%z')