Skip to content

GRID3 Async

Bases: AsyncBaseDataSource

Source code in nigeria_geodata/datasources/grid3.py
class AsyncGrid3(AsyncBaseDataSource):
    def __init__(self):
        self.sync_grid3 = Grid3()

    async def _run_sync(self, func, *args, **kwargs):
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, func, *args, **kwargs)

    async def list_data(
        self, dataframe: bool = True
    ) -> Union[List[EsriFeatureServiceBasicInfo], Optional["pd.DataFrame"]]:
        return await self._run_sync(self.sync_grid3.list_data, dataframe)

    async def search(
        self, query: str, dataframe: bool = True
    ) -> Union[List[EsriFeatureServiceBasicInfo], List, Optional["pd.DataFrame"]]:
        return await self._run_sync(self.sync_grid3.search, query, dataframe)

    async def filter(
        self,
        data_name: str,
        state: Optional[str] = None,
        bbox: Optional[List[float]] = None,
        aoi_geometry: Geometry = None,
        preview: bool = False,
        geodataframe: bool = False,
    ) -> Union[
        Optional["gpd.GeoDataFrame"],
        Optional["Map"],
        List[Dict[str, Any]],
    ]:
        return await self._run_sync(
            self.sync_grid3.filter,
            data_name,
            state,
            bbox,
            aoi_geometry,
            preview,
            geodataframe,
        )

    async def info(
        self, data_name: str, dataframe: bool = True
    ) -> Union[EsriFeatureLayerInfo, Optional["pd.DataFrame"], Dict[str, Any]]:
        return await self._run_sync(self.sync_grid3.info, data_name, dataframe)

    def __repr__(self) -> str:
        return "<AsyncGrid3DataSource>"