Skip to content

ftmq.query

Source code in ftmq/query.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
class Query:
    DEFAULT_SEARCH_PROPS = (
        Properties["name"],
        Properties["firstName"],
        Properties["middleName"],
        Properties["lastName"],
    )

    def __init__(
        self,
        filters: Iterable[F] | None = None,
        search_filters: Iterable[F] | None = None,
        aggregations: Iterable[Aggregation] | None = None,
        aggregator: Aggregator | None = None,
        sort: Sort | None = None,
        slice: Slice | None = None,
    ):
        self.filters = set(ensure_list(filters))
        self.search_filters = set(ensure_list(search_filters))
        self.aggregations = set(ensure_list(aggregations))
        self.aggregator = aggregator
        self.sort = sort
        self.slice = slice

    def __getitem__(self, value: Any) -> Q:
        """
        Implement list-like slicing. No negative values allowed.

        Examples:
            >>> q[1]
            # 2nd element (0-index)
            >>> q[:10]
            # first 10 elements
            >>> q[10:20]
            # next 10 elements

        Returns:
            The updated `Query` instance
        """
        if isinstance(value, int):
            if value < 0:
                raise ValidationError(f"Invalid slicing: `{value}`")
            return self._chain(slice=slice(value, value + 1))
        if isinstance(value, slice):
            if value.step is not None:
                raise ValidationError(f"Invalid slicing: `{value}`")
            return self._chain(slice=value)
        raise NotImplementedError

    def __bool__(self) -> bool:
        """
        Detect if any filter, ordering or slicing is defined

        Examples:
            >>> bool(Query())
            False
            >>> bool(Query().where(dataset="my_dataset"))
            True
        """
        return bool(self.to_dict())

    def __hash__(self) -> int:
        """
        Generate a unique key of the current state, useful for caching
        """
        return hash(repr(self.to_dict()))

    def _chain(self, **kwargs):
        # merge current state
        new_kwargs = self.__dict__.copy()
        for key, new_value in kwargs.items():
            old_value = new_kwargs[key]
            if old_value is None:
                new_kwargs[key] = new_value
            # "remove" old value:
            elif new_value is None:
                new_kwargs[key] = None
            # overwrite order by
            elif key == "sort":
                new_kwargs[key] = new_value
            # combine iterables and dicts
            elif is_listish(old_value):
                new_kwargs[key] = sorted(set(old_value) | set(new_value))
            elif is_mapping(old_value):
                new_kwargs[key] = {**old_value, **new_value}
            else:  # replace
                new_kwargs[key] = new_value
        return self.__class__(**new_kwargs)

    def _get_lookups(self, filters: set[F]) -> dict[str, Any]:
        data = {}
        for fi in filters:
            for k, v in fi.to_dict().items():
                current = data.get(k)
                if is_listish(current):
                    data[k].append(v)
                else:
                    data[k] = v
        return data

    @property
    def lookups(self) -> dict[str, Any]:
        """
        The current filter lookups as dictionary
        """
        return self._get_lookups(self.filters)

    @property
    def search_lookups(self) -> dict[str, Any]:
        """
        The current search lookups as dictionary
        """
        return self._get_lookups(self.search_filters)

    @property
    def limit(self) -> int | None:
        """
        The current limit (inferred from a slice)
        """
        if self.slice is None:
            return None
        if self.slice.start and self.slice.stop:
            return self.slice.stop - self.slice.start
        return self.slice.stop

    @property
    def offset(self) -> int | None:
        """
        The current offset (inferred from a slice)
        """
        return self.slice.start if self.slice else None

    @property
    def sql(self) -> Sql:
        """
        An object of this query used for sql interfaces
        """
        return Sql(self)

    @property
    def ids(self) -> set[IdFilter]:
        """
        The current id filters
        """
        return {f for f in self.filters if isinstance(f, IdFilter)}

    @property
    def datasets(self) -> set[DatasetFilter]:
        """
        The current dataset filters
        """
        return {f for f in self.filters if isinstance(f, DatasetFilter)}

    @property
    def dataset_names(self) -> set[str]:
        """
        The names of the current filtered datasets
        """
        names = set()
        for f in self.datasets:
            names.update(ensure_list(f.value))
        return names

    @property
    def schemata(self) -> set[SchemaFilter]:
        """
        The current schema filters
        """
        return {f for f in self.filters if isinstance(f, SchemaFilter)}

    @property
    def schemata_names(self) -> set[str]:
        """
        The names of the current filtered schemas
        """
        names = set()
        for f in self.schemata:
            names.update(ensure_list(f.value))
        return names

    @property
    def countries(self) -> set[str]:
        """
        The current filtered countries
        """
        names = set()
        for f in self.properties:
            if f.key == "country":
                names.update(ensure_list(f.value))
        return names

    @property
    def reversed(self) -> set[ReverseFilter]:
        """
        The current reverse lookup filters
        """
        return {f for f in self.filters if isinstance(f, ReverseFilter)}

    @property
    def properties(self) -> set[PropertyFilter]:
        """
        The current property lookup filters
        """
        return {f for f in self.filters if isinstance(f, PropertyFilter)}

    def discard(self, f_cls: F) -> None:
        filters = list(self.filters)
        for f in filters:
            if isinstance(f, f_cls):
                self.filters.discard(f)

    def to_dict(self) -> dict[str, Any]:
        """
        Dictionary representation of the current object

        Example:
            ```python
            q = Query().where(dataset__in=["d1", "d2"])
            assert q.to_dict() == {"dataset__in": {"d1", "d2"}}
            q = q.where(schema="Event").where(schema__in=["Person", "Organization"])
            assert q.to_dict() == {
                    "dataset__in": {"d1", "d2"},
                    "schema": "Event",
                    "schema__in": {"Organization", "Person"},
                }
            ```
        """
        data = self.lookups
        search_data = self.search_lookups
        if search_data:
            data["search"] = search_data
        if self.sort:
            data["order_by"] = self.sort.serialize()
        if self.slice:
            data["limit"] = self.limit
            data["offset"] = self.offset
        if self.aggregations:
            data["aggregations"] = self.get_aggregator().to_dict()
        return data

    def where(self, **lookup: Any) -> Q:
        """
        Add another lookup to the current `Query` instance.

        Example:
            ```python
            q = Query().where(dataset="my_dataset")
            q = q.where(schema="Payment")
            q = q.where(date__gte="2024-10", date__lt="2024-11")
            q = q.order_by("amountEur", ascending=False)
            ```

        Args:
            **lookup: A dataset lookup `dataset="my_dataset"`
            **lookup: A schema lookup `schema="Person"`
            **lookup: `include_descendants=True`: Include schema descendants for
                given schema lookup
            **lookup: `include_matchable=True`: Include matchable schema for
                given schema lookup
            **lookup: A property=value lookup (with optional comparators):
                `name__startswith="Ja"`

        Returns:
            The updated `Query` instance
        """
        include_descendants = lookup.pop("include_descendants", False)
        include_matchable = lookup.pop("include_matchable", False)
        prop = lookup.pop("prop", None)
        value = lookup.pop("value", None)
        comparator = lookup.pop("comparator", None)
        if prop is not None:
            if value is None:
                raise ValidationError("No lookup value specified")
            f = PropertyFilter(prop, value, comparator)
            self.filters.discard(f)  # replace existing property filter with updated one
            self.filters.add(f)

        properties: dict[str, Any] = {}
        for key, value in lookup.items():
            meta = False
            for f_key, f in FILTERS.items():
                if key.startswith(f_key):
                    if value is None:
                        self.discard(f)
                    else:
                        key, comparator = parse_comparator(key)
                        kwargs = {}
                        if key == "schema":
                            kwargs = {
                                "include_matchable": include_matchable,
                                "include_descendants": include_descendants,
                            }
                        self.filters.add(f(value, comparator, **kwargs))
                    meta = True
                    break
            if not meta:
                properties[key] = value

        # parse arbitrary `date__gte=2023` stuff
        for key, val in properties.items():
            for prop, value, comparator in parse_unknown_filters((key, val)):
                f = PropertyFilter(prop, value, comparator)
                self.filters.discard(
                    f
                )  # replace existing property filter with updated one
                self.filters.add(f)

        return self._chain()

    def search(self, q: str, props: Iterable[Properties | str] = None) -> Q:
        # reset existing search
        self.search_filters: set[F] = set()
        props = props or self.DEFAULT_SEARCH_PROPS
        for prop in props:
            self.search_filters.add(PropertyFilter(prop, q, Comparators.ilike))
        return self._chain()

    def order_by(self, *values: Iterable[str], ascending: bool | None = True) -> Q:
        """
        Add or update the current sorting.

        Args:
            *values: Fields to order by
            ascending: Ascending or descending

        Returns:
            The updated `Query` instance.
        """
        self.sort = Sort(values=values, ascending=ascending)
        return self._chain()

    def aggregate(
        self,
        func: Aggregations,
        *props: Properties,
        groups: Properties | list[Properties] | None = None,
    ) -> Q:
        for prop in props:
            self.aggregations.add(
                Aggregation(func=func, prop=prop, group_props=ensure_list(groups))
            )
        return self._chain()

    def get_aggregator(self) -> Aggregator:
        return Aggregator(aggregations=self.aggregations)

    def apply_filter(self, proxy: CE) -> bool:
        if not self.filters:
            return True
        return all(f.apply(proxy) for f in self.filters)

    def apply_search(self, proxy: CE) -> bool:
        if not self.search_filters:
            return True
        return any(f.apply(proxy) for f in self.search_filters)

    def apply(self, proxy: CE) -> bool:
        """
        Test if a proxy matches the current `Query` instance.
        """
        if self.apply_filter(proxy):
            return self.apply_search(proxy)
        return False

    def apply_iter(self, proxies: CEGenerator) -> CEGenerator:
        """
        Apply the current `Query` instance to a generator of proxies and return
        a generator of filtered proxies

        Example:
            ```python
            proxies = [...]
            q = Query().where(dataset="my_dataset", schema="Company")
            for proxy in q.apply_iter(proxies):
                assert proxy.schema.name == "Company"
            ```

        Yields:
            A generator of `nomenklatura.entity.CompositeEntity`
        """
        if not self:
            yield from proxies
            return

        proxies = (p for p in proxies if self.apply(p))
        if self.sort:
            proxies = self.sort.apply_iter(proxies)
        if self.slice:
            proxies = islice(
                proxies, self.slice.start, self.slice.stop, self.slice.step
            )
        if self.aggregations:
            self.aggregator = self.get_aggregator()
            proxies = self.aggregator.apply(proxies)
        yield from proxies

countries: set[str] property

The current filtered countries

dataset_names: set[str] property

The names of the current filtered datasets

datasets: set[DatasetFilter] property

The current dataset filters

ids: set[IdFilter] property

The current id filters

limit: int | None property

The current limit (inferred from a slice)

lookups: dict[str, Any] property

The current filter lookups as dictionary

offset: int | None property

The current offset (inferred from a slice)

properties: set[PropertyFilter] property

The current property lookup filters

reversed: set[ReverseFilter] property

The current reverse lookup filters

schemata: set[SchemaFilter] property

The current schema filters

schemata_names: set[str] property

The names of the current filtered schemas

search_lookups: dict[str, Any] property

The current search lookups as dictionary

sql: Sql property

An object of this query used for sql interfaces

__bool__()

Detect if any filter, ordering or slicing is defined

Examples:

>>> bool(Query())
False
>>> bool(Query().where(dataset="my_dataset"))
True
Source code in ftmq/query.py
def __bool__(self) -> bool:
    """
    Detect if any filter, ordering or slicing is defined

    Examples:
        >>> bool(Query())
        False
        >>> bool(Query().where(dataset="my_dataset"))
        True
    """
    return bool(self.to_dict())

__getitem__(value)

Implement list-like slicing. No negative values allowed.

Examples:

>>> q[1]
# 2nd element (0-index)
>>> q[:10]
# first 10 elements
>>> q[10:20]
# next 10 elements

Returns:

Type Description
Q

The updated Query instance

Source code in ftmq/query.py
def __getitem__(self, value: Any) -> Q:
    """
    Implement list-like slicing. No negative values allowed.

    Examples:
        >>> q[1]
        # 2nd element (0-index)
        >>> q[:10]
        # first 10 elements
        >>> q[10:20]
        # next 10 elements

    Returns:
        The updated `Query` instance
    """
    if isinstance(value, int):
        if value < 0:
            raise ValidationError(f"Invalid slicing: `{value}`")
        return self._chain(slice=slice(value, value + 1))
    if isinstance(value, slice):
        if value.step is not None:
            raise ValidationError(f"Invalid slicing: `{value}`")
        return self._chain(slice=value)
    raise NotImplementedError

__hash__()

Generate a unique key of the current state, useful for caching

Source code in ftmq/query.py
def __hash__(self) -> int:
    """
    Generate a unique key of the current state, useful for caching
    """
    return hash(repr(self.to_dict()))

apply(proxy)

Test if a proxy matches the current Query instance.

Source code in ftmq/query.py
def apply(self, proxy: CE) -> bool:
    """
    Test if a proxy matches the current `Query` instance.
    """
    if self.apply_filter(proxy):
        return self.apply_search(proxy)
    return False

apply_iter(proxies)

Apply the current Query instance to a generator of proxies and return a generator of filtered proxies

Example
proxies = [...]
q = Query().where(dataset="my_dataset", schema="Company")
for proxy in q.apply_iter(proxies):
    assert proxy.schema.name == "Company"

Yields:

Type Description
CEGenerator

A generator of nomenklatura.entity.CompositeEntity

Source code in ftmq/query.py
def apply_iter(self, proxies: CEGenerator) -> CEGenerator:
    """
    Apply the current `Query` instance to a generator of proxies and return
    a generator of filtered proxies

    Example:
        ```python
        proxies = [...]
        q = Query().where(dataset="my_dataset", schema="Company")
        for proxy in q.apply_iter(proxies):
            assert proxy.schema.name == "Company"
        ```

    Yields:
        A generator of `nomenklatura.entity.CompositeEntity`
    """
    if not self:
        yield from proxies
        return

    proxies = (p for p in proxies if self.apply(p))
    if self.sort:
        proxies = self.sort.apply_iter(proxies)
    if self.slice:
        proxies = islice(
            proxies, self.slice.start, self.slice.stop, self.slice.step
        )
    if self.aggregations:
        self.aggregator = self.get_aggregator()
        proxies = self.aggregator.apply(proxies)
    yield from proxies

order_by(*values, ascending=True)

Add or update the current sorting.

Parameters:

Name Type Description Default
*values Iterable[str]

Fields to order by

()
ascending bool | None

Ascending or descending

True

Returns:

Type Description
Q

The updated Query instance.

Source code in ftmq/query.py
def order_by(self, *values: Iterable[str], ascending: bool | None = True) -> Q:
    """
    Add or update the current sorting.

    Args:
        *values: Fields to order by
        ascending: Ascending or descending

    Returns:
        The updated `Query` instance.
    """
    self.sort = Sort(values=values, ascending=ascending)
    return self._chain()

to_dict()

Dictionary representation of the current object

Example
q = Query().where(dataset__in=["d1", "d2"])
assert q.to_dict() == {"dataset__in": {"d1", "d2"}}
q = q.where(schema="Event").where(schema__in=["Person", "Organization"])
assert q.to_dict() == {
        "dataset__in": {"d1", "d2"},
        "schema": "Event",
        "schema__in": {"Organization", "Person"},
    }
Source code in ftmq/query.py
def to_dict(self) -> dict[str, Any]:
    """
    Dictionary representation of the current object

    Example:
        ```python
        q = Query().where(dataset__in=["d1", "d2"])
        assert q.to_dict() == {"dataset__in": {"d1", "d2"}}
        q = q.where(schema="Event").where(schema__in=["Person", "Organization"])
        assert q.to_dict() == {
                "dataset__in": {"d1", "d2"},
                "schema": "Event",
                "schema__in": {"Organization", "Person"},
            }
        ```
    """
    data = self.lookups
    search_data = self.search_lookups
    if search_data:
        data["search"] = search_data
    if self.sort:
        data["order_by"] = self.sort.serialize()
    if self.slice:
        data["limit"] = self.limit
        data["offset"] = self.offset
    if self.aggregations:
        data["aggregations"] = self.get_aggregator().to_dict()
    return data

where(**lookup)

Add another lookup to the current Query instance.

Example
q = Query().where(dataset="my_dataset")
q = q.where(schema="Payment")
q = q.where(date__gte="2024-10", date__lt="2024-11")
q = q.order_by("amountEur", ascending=False)

Parameters:

Name Type Description Default
**lookup Any

A dataset lookup dataset="my_dataset"

{}
**lookup Any

A schema lookup schema="Person"

{}
**lookup Any

include_descendants=True: Include schema descendants for given schema lookup

{}
**lookup Any

include_matchable=True: Include matchable schema for given schema lookup

{}
**lookup Any

A property=value lookup (with optional comparators): name__startswith="Ja"

{}

Returns:

Type Description
Q

The updated Query instance

Source code in ftmq/query.py
def where(self, **lookup: Any) -> Q:
    """
    Add another lookup to the current `Query` instance.

    Example:
        ```python
        q = Query().where(dataset="my_dataset")
        q = q.where(schema="Payment")
        q = q.where(date__gte="2024-10", date__lt="2024-11")
        q = q.order_by("amountEur", ascending=False)
        ```

    Args:
        **lookup: A dataset lookup `dataset="my_dataset"`
        **lookup: A schema lookup `schema="Person"`
        **lookup: `include_descendants=True`: Include schema descendants for
            given schema lookup
        **lookup: `include_matchable=True`: Include matchable schema for
            given schema lookup
        **lookup: A property=value lookup (with optional comparators):
            `name__startswith="Ja"`

    Returns:
        The updated `Query` instance
    """
    include_descendants = lookup.pop("include_descendants", False)
    include_matchable = lookup.pop("include_matchable", False)
    prop = lookup.pop("prop", None)
    value = lookup.pop("value", None)
    comparator = lookup.pop("comparator", None)
    if prop is not None:
        if value is None:
            raise ValidationError("No lookup value specified")
        f = PropertyFilter(prop, value, comparator)
        self.filters.discard(f)  # replace existing property filter with updated one
        self.filters.add(f)

    properties: dict[str, Any] = {}
    for key, value in lookup.items():
        meta = False
        for f_key, f in FILTERS.items():
            if key.startswith(f_key):
                if value is None:
                    self.discard(f)
                else:
                    key, comparator = parse_comparator(key)
                    kwargs = {}
                    if key == "schema":
                        kwargs = {
                            "include_matchable": include_matchable,
                            "include_descendants": include_descendants,
                        }
                    self.filters.add(f(value, comparator, **kwargs))
                meta = True
                break
        if not meta:
            properties[key] = value

    # parse arbitrary `date__gte=2023` stuff
    for key, val in properties.items():
        for prop, value, comparator in parse_unknown_filters((key, val)):
            f = PropertyFilter(prop, value, comparator)
            self.filters.discard(
                f
            )  # replace existing property filter with updated one
            self.filters.add(f)

    return self._chain()