#%% Read and Process NWS Alerts # Created on: 2023-01-17 import datetime # Import Modules import geopandas as gp import pandas as pd import numpy as np import xarray as xr from netCDF4 import Dataset import nwswx import requests import shapely.wkt import re #%% Setup NWS ID nws = nwswx.WxAPI('api@alexanderrey.ca') #%% Read NWS Alerts # All alerts # alertsIN = nws.alerts(return_format=nwswx.formats.JSONLD) # Active alerts alertsIN = nws.active_alerts(return_format=nwswx.formats.JSONLD) nws_alerts = alertsIN['@graph'] # If more than one page of alerts, loop through an append while 'pagination' in alertsIN: # Get the next page of alerts result = requests.get(alertsIN['pagination']['next']) # Convert to JSON alertsIN = result.json() nws_alerts.extend(alertsIN['features']) print('AWS Alerts: ', len(nws_alerts)) #%% Create NWS Alerts DataFrame nws_alert_df = pd.DataFrame.from_records(nws_alerts) #%% Read in NWS Polygon shapefiles as GeoDataFrame z_shp = gp.read_file("C:/Users/arey/Downloads/z_13se22/z_13se22.shp") fz_shp = gp.read_file("C:/Users/arey/Downloads/fz13se22/fz13se22.shp") oz_shp = gp.read_file("C:/Users/arey/Downloads/oz22mr22/oz22mr22.shp") mz_shp = gp.read_file("C:/Users/arey/Downloads/mz13se22/mz13se22.shp") w_shp = gp.read_file("C:/Users/arey/Downloads/w_22mr22/w_22mr22.shp") # ba_shp = gp.read_file("C:/Users/arey/Downloads/ba12my15/ba12my15.shp") # hs_shp = gp.read_file("C:/Users/arey/Downloads/hs08mr23/hs08mr23.shp") # s_shp = gp.read_file("C:/Users/arey/Downloads/s_22mr22/s_22mr22.shp") #%% Match NWS Alerts to NWS Polygons # Create empty list nws_alert_polygons = [] nws_alert_data = [] nws_alert_debug = [] # Loop through each NWS Alert and match to NWS Polygon using UGC for i in range(len(nws_alert_df)): # If alert contains geometry as a string if type(nws_alert_df['geometry'][i]) == str: nws_alert_polygons.append(shapely.wkt.loads(nws_alert_df['geometry'][i])) nws_alert_data.append(nws_alert_df.iloc[i, :]) nws_alert_debug.append(i) # Geometry Array elif type(nws_alert_df['geometry'][i]) == gp.array.GeometryArray: nws_alert_polygons.append(nws_alert_df['geometry'][i][0]) nws_alert_data.append(nws_alert_df.iloc[i, :]) nws_alert_debug.append(i) else: # Get the UGC code for the alert UGCs = nws_alert_df['geocode'][i]['UGC'] # Loop through each UGC code and find matching polygon for ugc in UGCs: # Zone based warning if (ugc[2] == 'Z') & (ugc[0:2] in z_shp['STATE'].unique()): alert_poly = z_shp[(z_shp['STATE'] == ugc[0:2]) & (z_shp['ZONE'] == ugc[3:6])].geometry # Fire Weather Zone based warning elif (ugc[2] == 'C') & (ugc[0:2] in z_shp['STATE'].unique()): alert_poly = fz_shp[(fz_shp['STATE'] == ugc[0:2]) & (fz_shp['ZONE'] == ugc[3:6])].geometry # Marine Zone based warning elif ugc in mz_shp['ID'].unique(): alert_poly = mz_shp[mz_shp['ID'] == ugc].geometry # Offshore Zone based warning elif ugc in oz_shp['ID'].unique(): alert_poly = oz_shp[oz_shp['ID'] == ugc].geometry # elif ugc[0] == 'W': # nws_alert_df['geometry'][i] = w_shp[w_shp['UGC'] == ugc].geometry.values else: print('No polygon found for UGC: ', ugc) print('Alert: ', nws_alert_df['headline'][i]) continue # If no matches found if len(alert_poly) == 0: print('No polygon found for UGC: ', ugc) print('Alert: ', nws_alert_df['headline'][i]) continue # Append polygon to list nws_alert_polygons.append(alert_poly.values[0]) nws_alert_data.append(nws_alert_df.iloc[i, :]) nws_alert_debug.append(i) #%% Create GeoDataFrame of alert polygons nws_alert_gdf = gp.GeoDataFrame(nws_alert_data, geometry=nws_alert_polygons) nws_alert_gdf.reset_index(inplace=True) #%% Create grid of points xs = np.arange(-127, -65, 0.075) ys = np.arange(24, 50, 0.075) lons, lats = np.meshgrid(xs, ys) # Create GeoSeries of Points gridPointsSeries = gp.GeoSeries.from_xy(lons.flatten(), lats.flatten()) #%% Loop through each NWS Alert and find points within polygon alertData_1D = [None for i in range(len(gridPointsSeries))] # Loop through NWS Alerts for alertIDX in nws_alert_gdf.index: alertString = '' # Find h3 index for point # Identify which points are within an alert polygon alertPoints = np.where(nws_alert_gdf.loc[alertIDX, 'geometry'].contains(gridPointsSeries))[0] # Loop through points and add alert data for alertPoint in alertPoints: # Get existing alert data and add new alert alertString = alertData_1D[alertPoint] if alertString == None: alertString = '' alertDescription = nws_alert_gdf.loc[alertIDX, 'description'] if alertDescription == None: alertDescription = nws_alert_gdf.loc[alertIDX, 'headline'] if nws_alert_gdf.loc[alertIDX, 'ends'] == None: alertEndTime = nws_alert_gdf.loc[alertIDX, 'expires'] else: alertEndTime = nws_alert_gdf.loc[alertIDX, 'ends'] alertString = alertString + '[' + nws_alert_gdf.loc[alertIDX, 'headline'] + '}' \ '{' + alertDescription + '}' + \ '{' + nws_alert_gdf.loc[alertIDX, 'areaDesc'] + '}' + \ '{' + nws_alert_gdf.loc[alertIDX, 'onset'] + '}' + \ '{' + alertEndTime + '}' + \ '{' + nws_alert_gdf.loc[alertIDX, 'severity'] + '}' + \ '{' + nws_alert_gdf.loc[alertIDX, '@id'] + ']' alertData_1D[alertPoint] = alertString print(alertIDX) #%% Create xarray of alert data # Convert list to numpy list_alerts_1d_np = np.array(alertData_1D) # Reshape back to 2d array list_alerts_2d_np = list_alerts_1d_np.reshape(lons.shape) # Convert to xarray grid_alerts_xr = xr.DataArray(data=list_alerts_2d_np, dims=["x", "y"], coords={"lon": (("x", "y"), lons), "lat": (("x", "y"), lats)} ) # Save as netcdf grid_alerts_xr.to_netcdf('grid_alerts_contains8.nc', engine="h5netcdf", encoding={"__xarray_dataarray_variable__": {'gzip': True, "compression_opts": 9}}) #%% Read in netcdf grid_alerts_nc = Dataset('grid_alerts_contains5.nc') grid_alert_pt = grid_alerts_nc['__xarray_dataarray_variable__'][176, 300] print(grid_alerts_nc['lon'][176, 300]) print(grid_alerts_nc['lat'][176, 300]) #%% Process alert data string alertPattern = '\[([^]]+)\]' alertList = re.findall(alertPattern, grid_alert_pt) # Loop through each alert for alert in alertList: # Extract alert details alertDetails = alert.split('}{') print(alertDetails) alertTime = datetime.datetime.strptime(alertDetails[3], '%Y-%m-%dT%H:%M:%S%z')