Skip to content

raster

Functions for spatial processing of raster TIFF files.

default_summary_func(v)

Sum an array and round to the nearest integer.

Source code in rastertoolkit/raster.py
199
200
201
def default_summary_func(v: np.ndarray) -> int:
    """Sum an array and round to the nearest integer."""
    return int(np.round(np.sum(v), 0))

extract_xy_info_from_raster(raster)

Extracts x, y, dx, and dy from a TiffPage object.

Parameters:

Name Type Description Default
raster TiffPage

Single tiff layer.

required

Returns: tuple: A tuple of x, y, dx, and dy.

Source code in rastertoolkit/raster.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def extract_xy_info_from_raster(raster: TiffPage) -> tuple[float, float, float, float]:
    """
    Extracts x, y, dx, and dy from a TiffPage object.

    Args:
        raster (TiffPage): Single tiff layer.

    Returns:
    tuple: A tuple of x, y, dx, and dy.
    """

    # Extract data from raster
    tags = get_tiff_tags(raster)
    point = tags["ModelTiepointTag"]
    scale = tags["ModelPixelScaleTag"]
    x0, y0 = point[3], point[4]
    dx, dy = scale[0], -scale[1]

    # Make sure values are in range
    assert -180 < x0 < 180, "Tie point x coordinate (longitude) have invalid range."
    assert -85 < y0 < 85, "Tie point y coordinate (latitude) have invalid range."
    assert 0 < dx < 1, "Pixel dx scale has invalid range."
    assert -1 < dy < 0, "Pixel dy scale has invalid range."
    nodata = getattr(raster, "nodata", None)
    if nodata is not None:
        assert nodata < 0, "Invalid no-data attribute; must be negative."

    return x0, y0, dx, dy

get_tiff_tags(raster)

Reads tags from a TiffPage object.

Parameters:

Name Type Description Default
raster TiffPage

Single tiff layer.

required

Returns:

Name Type Description
dict dict[str, Any]

A dictionary of tag names and values.

Source code in rastertoolkit/raster.py
204
205
206
207
208
209
210
211
212
213
214
def get_tiff_tags(raster: TiffPage) -> dict[str, Any]:
    """
    Reads tags from a TiffPage object.

    Args:
        raster (TiffPage): Single tiff layer.

    Returns:
        dict: A dictionary of tag names and values.
    """
    return {tag_obj.name: tag_obj.value for tag_obj in raster.tags}

init_sparse_matrix(raster)

Initialize a matrix from a raster TiffPage object with values > 0

Source code in rastertoolkit/raster.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def init_sparse_matrix(raster: TiffPage) -> np.ndarray:
    """Initialize a matrix from a raster TiffPage object with values > 0"""

    # Extract data from raster
    x0, y0, dx, dy = extract_xy_info_from_raster(raster)

    dat_mat = raster.asarray()
    xy_ints = np.argwhere(dat_mat > 0)
    sparse_data = np.zeros((xy_ints.shape[0], 3), dtype=float)

    # Construct sparse matrix of (long, lat, data)
    sparse_data[:, 0] = x0 + dx * xy_ints[:, 1] + dx / 2.0
    sparse_data[:, 1] = y0 + dy * xy_ints[:, 0] + dy / 2.0
    sparse_data[:, 2] = dat_mat[xy_ints[:, 0], xy_ints[:, 1]]

    return sparse_data

interpolate_at_weight_data(shape, weight_clip, value_clip, data_bool)

Interpolate at weight data.

Parameters:

Name Type Description Default
shape ShapeView

Shape object.

required
weight_clip ndarray

Clipped weight data.

required
value_clip ndarray

Clipped value data.

required
data_bool bool

Boolean indicating if the data is interior to the shape.

required

Returns:

Name Type Description
float float

The interpolated value at weight data.

Source code in rastertoolkit/raster.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def interpolate_at_weight_data(
    shape: ShapeView, weight_clip: np.ndarray, value_clip: np.ndarray, data_bool: bool
) -> float:
    """
    Interpolate at weight data.

    Args:
        shape (ShapeView): Shape object.
        weight_clip (np.ndarray): Clipped weight data.
        value_clip (np.ndarray): Clipped value data.
        data_bool (bool): Boolean indicating if the data is interior to the shape.

    Returns:
        float: The interpolated value at weight data.
    """
    # Calculate population weighted value
    weight = np.sum(weight_clip[data_bool, 2])

    # Prep interpolate coordinates and value arguments
    value_args = [value_clip[:, 0:2], value_clip[:, 2]]

    if weight > 0:
        # Interpolate at weight, assign -1 for problems
        val_est = interpolate.griddata(*value_args, weight_clip[:, 0:2], fill_value=-1)
        if -1 in val_est:
            err_dex = val_est == -1
            # Use the nearest value for problems
            val_rev = interpolate.griddata(
                *value_args, weight_clip[err_dex, 0:2], method="nearest"
            )
            val_est[err_dex] = val_rev
        # Use population to weight values
        final_val = np.sum(weight_clip[data_bool, 2] * val_est[data_bool]) / weight
    else:
        # No population data, interpolate at boundary, assign -1 for problems
        val_est = interpolate.griddata(*value_args, shape.points[:, 0:2], fill_value=-1)
        if -1 in val_est:
            err_dex = val_est == -1
            # Use the nearest value for problems
            val_rev = interpolate.griddata(
                *value_args, shape.points[err_dex, 0:2], method="nearest"
            )
            val_est[err_dex] = val_rev

        # Average values at shape perimeter
        final_val = np.mean(val_est)

    return final_val

is_interior(shape, data_clip)

Check if the data is interior to the shape.

Parameters:

Name Type Description Default
shape ShapeView

Shape object.

required
data_clip ndarray

Clipped data.

required

Returns:

Name Type Description
bool bool

True if the data is interior to the shape.

Source code in rastertoolkit/raster.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def is_interior(shape: ShapeView, data_clip: np.ndarray) -> bool:
    """
    Check if the data is interior to the shape.

    Args:
        shape (ShapeView): Shape object.
        data_clip (np.ndarray): Clipped data.

    Returns:
        bool: True if the data is interior to the shape.
    """
    # Track booleans (indicates if lat/long is interior)
    data_bool = np.zeros(data_clip.shape[0], dtype=bool)

    # Iterate over parts of shapefile
    for path_shp, area_prt in zip(shape.paths, shape.areas):
        # Union of positive areas; intersection with negative areas
        if area_prt > 0:
            data_bool = np.logical_or(
                data_bool, path_shp.contains_points(data_clip[:, :2])
            )
        else:
            data_bool = np.logical_and(
                data_bool, np.logical_not(path_shp.contains_points(data_clip[:, :2]))
            )

    return data_bool

print_status(shape, data_dict, k1, shape_count)

Print status message.

Source code in rastertoolkit/raster.py
350
351
352
353
354
355
356
357
358
359
360
361
def print_status(shape: ShapeView, data_dict: dict, k1: int, shape_count: int) -> None:
    """Print status message."""
    perc = round(100 * (k1 + 1) / shape_count)
    print(
        k1 + 1,
        "of",
        shape_count,
        f"({perc}%)",
        shape.name,
        shape.center,
        data_dict[shape.name],
    )

raster_clip(raster_file, shape_stem, shape_attr='DOTNAME', attr_filter=None, summary_func=None, include_latlon=False, quiet=False)

Extracts data from a raster based on shapes.

Parameters:

Name Type Description Default
raster_file str

Local path to a raster file.

required
shape_stem str

Local path stem referencing a set of shape files.

required
shape_attr str

The shape attribute name to be used as the output dictionary key.

'DOTNAME'
summary_func Callable

Aggregation function to be used for summarizing clipped data for each shape.

None
include_latlon bool

Flag to include lat/lon in the dictionary entry. Defaults to False.

False
quiet bool

Flag to control whether status messages are printed. Defaults to False.

False

Returns:

Name Type Description
dict dict[str, Union[float, int]]

A dictionary with dot names as keys and calculated aggregations as values.

Source code in rastertoolkit/raster.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def raster_clip(
    raster_file: Union[str, Path],
    shape_stem: Union[str, Path],
    shape_attr: str = "DOTNAME",
    attr_filter: Union[str, None] = None,
    summary_func: Callable = None,
    include_latlon: bool = False,
    quiet: bool = False,
) -> dict[str, Union[float, int]]:
    """
    Extracts data from a raster based on shapes.

    Args:
        raster_file (str): Local path to a raster file.
        shape_stem (str): Local path stem referencing a set of shape files.
        shape_attr (str): The shape attribute name to be used as the output dictionary key.
        summary_func (Callable): Aggregation function to be used for summarizing clipped data for each shape.
        include_latlon (bool, optional): Flag to include lat/lon in the dictionary entry. Defaults to False.
        quiet (bool, optional): Flag to control whether status messages are printed. Defaults to False.

    Returns:
        dict: A dictionary with dot names as keys and calculated aggregations as values.
    """
    assert Path(raster_file).is_file(), "Raster file not found."

    print("Loading data...")

    # Load data, init sparse matrix
    shapes = ShapeView.from_file(shape_stem, shape_attr, attr_filter)
    raster = TiffFile(raster_file)
    sparse_data = init_sparse_matrix(raster.pages[0])

    # Output dictionary
    data_dict = dict()
    shape_len = len(shapes)
    print("Clipping:")

    fts = {}
    # Init the futures executor
    executor = ThreadPoolExecutor(max_workers=(os.cpu_count() - 1))

    # Iterate over shapes in shapefile
    for k1, shp in enumerate(shapes):
        fts[k1] = executor.submit(
            raster_clip_single,
            shp=shp,
            sparse_data=sparse_data,
            k1=k1,
            shape_len=shape_len,
            summary_func=summary_func,
            include_latlon=include_latlon,
            quiet=quiet,
        )

    data_dict = {}
    for k1, ft in fts.items():
        data_dict.update(ft.result())

    executor.shutdown(wait=True)

    return data_dict

raster_clip_single(shp, sparse_data, k1, shape_len, summary_func, include_latlon, quiet)

Extracts data from a raster based on shapes.

Parameters:

Name Type Description Default
shp ShapeView

Shape object.

required
sparse_data ndarray

Sparse matrix of raster data.

required
k1 int

Index of the shape.

required
shape_len int

Total number of shapes.

required
summary_func Callable

Aggregation function to be used for summarizing clipped data for each shape.

required
include_latlon bool

Flag to include lat/lon in the dictionary entry.

required
quiet bool

Flag to control whether status messages are printed.

required

Returns:

Name Type Description
dict dict[str, Union[float, int]]

A dictionary with dot names as keys and calculated aggregations as values.

Source code in rastertoolkit/raster.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def raster_clip_single(
    shp: ShapeView,
    sparse_data: np.ndarray,
    k1: int,
    shape_len: int,
    summary_func: Callable,
    include_latlon: bool,
    quiet: bool
) -> dict[str, Union[float, int]]:
    """
    Extracts data from a raster based on shapes.

    Args:
        shp (ShapeView): Shape object.
        sparse_data (np.ndarray): Sparse matrix of raster data.
        k1 (int): Index of the shape.
        shape_len (int): Total number of shapes.
        summary_func (Callable): Aggregation function to be used for summarizing clipped data for each shape.
        include_latlon (bool): Flag to include lat/lon in the dictionary entry.
        quiet (bool): Flag to control whether status messages are printed.

    Returns:
        dict: A dictionary with dot names as keys and calculated aggregations as values.
    """
    data_dict = {}
    show_status = not quiet or k1 % 1000 == 0 or k1 in [0, shape_len - 1]
    # Null shape; error in shapefile
    shp.validate()

    # Subset population data matrix for clipping
    data_clip = subset_matrix_for_clipping(shp, sparse_data)

    if data_clip.shape[0] == 0:
        data_dict[shp.name] = summary_entry(shp, {"pop": 0}, include_latlon)
        if show_status:
            print_status(shp, data_dict, k1, shape_len)
        return data_dict

    # Pop values
    value = data_clip[is_interior(shp, data_clip), 2]

    # Entry dictionary
    summary_func = summary_func or default_summary_func
    entry = {"pop": summary_func(value)}

    # Set entry and print status
    data_dict[shp.name] = summary_entry(shp, entry, include_latlon)
    if show_status:
        print_status(shp, data_dict, k1, shape_len)

    return data_dict

raster_clip_weighted(raster_weight, raster_value, shape_stem, shape_attr='DOTNAME', attr_filter=None, weight_summary_func=None, include_latlon=False)

Extracts data from a raster based on shapes.

Parameters:

Name Type Description Default
raster_weight str

Local path to a raster file used for weights.

required
raster_value str

Local path to a raster file used for values.

required
shape_stem str

Local path stem referencing a set of shape files.

required
shape_attr str

The shape attribute name to be used as the output dictionary key.

'DOTNAME'
weight_summary_func Callable

Aggregation function to be used for summarizing clipped data for each shape.

None
include_latlon bool

Flag to include lat/lon in the dictionary entry. Defaults to False.

False

Returns:

Name Type Description
dict dict[str, Union[float, int]]

A dictionary with dot names as keys and calculated aggregations as values.

Source code in rastertoolkit/raster.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def raster_clip_weighted(
    raster_weight: Union[str, Path],
    raster_value: Union[str, Path],
    shape_stem: Union[str, Path],
    shape_attr: str = "DOTNAME",
    attr_filter: Union[str, None] = None,
    weight_summary_func: Callable = None,
    include_latlon: bool = False,
) -> dict[str, Union[float, int]]:
    """
    Extracts data from a raster based on shapes.

    Args:
        raster_weight (str): Local path to a raster file used for weights.
        raster_value (str): Local path to a raster file used for values.
        shape_stem (str): Local path stem referencing a set of shape files.
        shape_attr (str): The shape attribute name to be used as the output dictionary key.
        weight_summary_func (Callable): Aggregation function to be used for summarizing clipped data for each shape.
        include_latlon (bool, optional): Flag to include lat/lon in the dictionary entry. Defaults to False.

    Returns:
        dict: A dictionary with dot names as keys and calculated aggregations as values.
    """
    assert Path(raster_weight).is_file(), "Population raster file not found."
    assert Path(raster_value).is_file(), "Values raster file not found."

    # Load data shape and rasters
    shapes = ShapeView.from_file(shape_stem, shape_attr, attr_filter)
    raster_weights = TiffFile(raster_weight)
    raster_values = TiffFile(raster_value)

    # Init sparse matrices
    sparse_pop = init_sparse_matrix(raster_weights.pages[0])
    sparse_val = init_sparse_matrix(raster_values.pages[0])

    # Output dictionary
    data_dict = dict()

    # Iterate over shapes in shapefile
    for k1, shp in enumerate(shapes):
        # Null shape; error in shapefile
        shp.validate()

        # Subset matrices for clipping
        pop_clip = subset_matrix_for_clipping(shape=shp, sparse_data=sparse_pop)
        val_clip = subset_matrix_for_clipping(shape=shp, sparse_data=sparse_val, pad=1)

        # Track booleans (indicates if lat/long is interior)
        data_bool = is_interior(shp, pop_clip)

        # Interpolate at population data
        final_val = interpolate_at_weight_data(shp, pop_clip, val_clip, data_bool)

        # Pop values
        values = pop_clip[data_bool, 2]

        # Entry dictionary
        weight_summary_func = weight_summary_func or default_summary_func
        entry = {"pop": weight_summary_func(values), "val": final_val}

        # Set entry and print status
        data_dict[shp.name] = summary_entry(shp, entry, include_latlon)
        print_status(shp, data_dict, k1, len(shapes))

    return data_dict

subset_matrix_for_clipping(shape, sparse_data, pad=0)

Subset the matrix for clipping

Parameters:

Name Type Description Default
shape ShapeView

Shape object.

required
sparse_data ndarray

Sparse matrix of raster data.

required
pad int

Padding for clipping.

0

Returns:

Type Description
ndarray

np.ndarray: A subset of the matrix for clipping.

Source code in rastertoolkit/raster.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def subset_matrix_for_clipping(
    shape: ShapeView, sparse_data: np.ndarray, pad: int = 0
) -> np.ndarray:
    """
    Subset the matrix for clipping

    Args:
        shape (ShapeView): Shape object.
        sparse_data (np.ndarray): Sparse matrix of raster data.
        pad (int): Padding for clipping.

    Returns:
        np.ndarray: A subset of the matrix for clipping.
    """
    clip_bool1 = np.logical_and(
        sparse_data[:, 0] > shape.xy_min[0] - pad,
        sparse_data[:, 1] > shape.xy_min[1] - pad,
    )
    clip_bool2 = np.logical_and(
        sparse_data[:, 0] < shape.xy_max[0] + pad,
        sparse_data[:, 1] < shape.xy_max[1] + pad,
    )
    data_clip = sparse_data[np.logical_and(clip_bool1, clip_bool2), :]

    return data_clip

summary_entry(shape, entry, include_latlon)

Summarize the entry for the shape.

Parameters:

Name Type Description Default
shape ShapeView

Shape object.

required
entry Union[dict, float, int]

Entry for the shape.

required
include_latlon bool

Flag to include lat/lon in the dictionary entry.

required

Returns:

Type Description
Union[dict, float, int]

Union[dict, float, int]: The summarized entry for the shape.

Source code in rastertoolkit/raster.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def summary_entry(
    shape: ShapeView, entry: Union[dict, float, int], include_latlon: bool
) -> Union[dict, float, int]:
    """
    Summarize the entry for the shape.

    Args:
        shape (ShapeView): Shape object.
        entry (Union[dict, float, int]): Entry for the shape.
        include_latlon (bool): Flag to include lat/lon in the dictionary entry.

    Returns:
        Union[dict, float, int]: The summarized entry for the shape.
    """
    if include_latlon:
        assert isinstance(entry, dict) and len(entry) > 0, "Invalid entry."
        lon = shape.center[0] if shape else np.nan
        lat = shape.center[1] if shape else np.nan
        final_entry = {"lat": lat, "lon": lon}
        final_entry.update(entry)
    else:
        if isinstance(entry, dict) and len(entry) == 1:
            final_entry = list(entry.values())[0]
        else:
            final_entry = entry

    return final_entry