Fink client v8


client

We are delighted to announce the release of Fink client version 8. The Fink client, available at fink-client, is a handy package that enables programmatic manipulation of alerts generated by the fink broker. It is particularly useful in the context of two primary Fink services: Livestream and Data Transfer.

This new major version boasts a simplified installation process and introduces new features to the Livestream service. For more information, please refer to the Livestream manual. Here, we will provide some highlights.

Streamlined Installation Procedure

In previous versions, the installation process was complicated due to the need to freeze an old version of fastavro. However, thanks to a recent contribution from one of our colleagues at AMPEL (see pull request fastavro#738), we can now upgrade to more recent versions. As a result, installing the client is now as simple as:

# get or upgrade to the latest version
pip install -U fink-client

Checking offsets

It might be beneficial to verify your position on the various queues by obtaining the offsets for each topic you are monitoring.

fink_consumer --display_statistics

Topic [Partition]                                   Committed        Lag
========================================================================
fink_sso_ztf_candidates_ztf  [4]                            1        972
------------------------------------------------------------------------
Total for fink_sso_ztf_candidates_ztf                       1        972
------------------------------------------------------------------------

Topic [Partition]                                   Committed        Lag
========================================================================
------------------------------------------------------------------------
Total for fink_sso_fink_candidates_ztf                      0          2
------------------------------------------------------------------------

In this example, I have two topic, fink_sso_ztf_candidates_ztf and fink_sso_fink_candidates_ztf.

For the first topic, there is a single active partition (partition number [4]) on the remote Kafka cluster that is providing data. I have retrieved a single alert (with a Committed status), and there are still 972 alerts left to be fetched (indicated by the Lag). Since there is only one active partition in this case, the total number of alerts matches the Lag value.

Regarding the second topic, I have not yet started polling data because no alerts have been committed yet.

Resetting offsets

Occasionally, you may want to re-fetch alerts, which involves restarting the polling process from the beginning of a queue. To accomplish this, you can use the following:

fink_consumer --display -start_at earliest
Resetting offsets to BEGINNING
...
assign TopicPartition{topic=fink_sso_fink_candidates_ztf,partition=0,offset=0,leader_epoch=None,error=None}
...
assign TopicPartition{topic=fink_sso_ztf_candidates_ztf,partition=0,offset=0,leader_epoch=None,error=None}
...
# poll restarts at the first offset

All topic partitions will be reset to their initial offset, which is 0 in this case. Likewise, you can clear all topics and restart polling from the most recent offset:

fink_consumer --display -start_at latest
...
assign TopicPartition{topic=fink_sso_fink_candidates_ztf,partition=0,offset=0,leader_epoch=None,error=None}
...
assign TopicPartition{topic=fink_sso_fink_candidates_ztf,partition=4,offset=2,leader_epoch=None,error=None}
...
assign TopicPartition{topic=fink_sso_ztf_candidates_ztf,partition=4,offset=973,leader_epoch=None,error=None}
...
No alerts the last 10 seconds
...

When emptying partitions, their offsets will be set to 0, while other partitions will maintain their offsets at the latest available value. The client will then wait for new data to arrive. It is important to note that the reset will only take effect during the next poll. Therefore, running the command fink_consumer --display_statistics immediately after the reset will not show the updated offsets.

This feature is especially helpful when addressing issues in the topic, such as malformed alerts being pushed, and you require a clean restart.

Troubleshooting schema mismatch

A typical error though would be:

Traceback (most recent call last):
  File "/Users/julien/anaconda3/bin/fink_consumer", line 10, in <module>
    sys.exit(main())
  File "/Users/julien/Documents/workspace/myrepos/fink-client/fink_client/scripts/fink_consumer.py", line 92, in main
    topic, alert = consumer.poll(timeout=maxtimeout)
  File "/Users/julien/Documents/workspace/myrepos/fink-client/fink_client/consumer.py", line 94, in poll
    alert = _decode_avro_alert(avro_alert, self._parsed_schema)
  File "/Users/julien/Documents/workspace/myrepos/fink-client/fink_client/avroUtils.py", line 381, in _decode_avro_alert
    return fastavro.schemaless_reader(avro_alert, schema)
  File "fastavro/_read.pyx", line 835, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 846, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 561, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 456, in fastavro._read.read_record
  File "fastavro/_read.pyx", line 559, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 431, in fastavro._read.read_union
  File "fastavro/_read.pyx", line 555, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 349, in fastavro._read.read_array
  File "fastavro/_read.pyx", line 561, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 456, in fastavro._read.read_record
  File "fastavro/_read.pyx", line 559, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 405, in fastavro._read.read_union
IndexError: list index out of range

This error happens when the schema to decode the alert is not matching the alert content. Usually this should not happen (schema is included in the alert payload). In case it happens though, you can force a schema:

fink_consumer [...] -schema [path_to_a_good_schema]

In case you do not have replacement schemas, you can save the current (faulty) schema that is contained within an alert packet:

fink_consumer -limit 1 --dump_schema

You will see the traceback above, with the message:

Schema saved as schema_2024-06-03T11:12:36.855544+00:00.json

Then you can inspect the schema manually, or open an issue on the fink-client repository by attaching this schema to your message.

What would be next?

We are preparing for the start of LSST. Many functionalities are being developed (automation of workflows, web client, centralised authentication with other Fink services, …), but let us know what you would like to have!