class AsyncGrid3(AsyncBaseDataSource):
def __init__(self):
self.sync_grid3 = Grid3()
async def _run_sync(self, func, *args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, func, *args, **kwargs)
async def list_data(
self, dataframe: bool = True
) -> Union[List[EsriFeatureServiceBasicInfo], Optional["pd.DataFrame"]]:
return await self._run_sync(self.sync_grid3.list_data, dataframe)
async def search(
self, query: str, dataframe: bool = True
) -> Union[List[EsriFeatureServiceBasicInfo], List, Optional["pd.DataFrame"]]:
return await self._run_sync(self.sync_grid3.search, query, dataframe)
async def filter(
self,
data_name: str,
state: Optional[str] = None,
bbox: Optional[List[float]] = None,
aoi_geometry: Geometry = None,
preview: bool = False,
geodataframe: bool = False,
) -> Union[
Optional["gpd.GeoDataFrame"],
Optional["Map"],
List[Dict[str, Any]],
]:
return await self._run_sync(
self.sync_grid3.filter,
data_name,
state,
bbox,
aoi_geometry,
preview,
geodataframe,
)
async def info(
self, data_name: str, dataframe: bool = True
) -> Union[EsriFeatureLayerInfo, Optional["pd.DataFrame"], Dict[str, Any]]:
return await self._run_sync(self.sync_grid3.info, data_name, dataframe)
def __repr__(self) -> str:
return "<AsyncGrid3DataSource>"