Source code for koboextractor

import requests
from typing import Any, Dict, List

[docs]class KoboExtractor: """Extracts collected data from KoBoToolbox. This class provides methods to connect to the kpi API of KoBoToolbox, access information about surveys, their questions, choices, and responses. Attributes: token: Your authentication token, which can be obtained from https://kf.kobotoolbox.org/token/. endpoint: The KoBoToolbox kpi API endpoint, e.g. https://kf.kobotoolbox.org/api/v2 or https://kobo.humanitarianresponse.info/api/v2. debug: Set to True to enable debugging output. Default: False. """ def __init__(self, token: str, endpoint: str, debug: bool = False) -> None: """Initialises the KoboExtractor with token and endpoint. Args: token: Your authentication token, which can be obtained from https://kf.kobotoolbox.org/token/. endpoint: The KoBoToolbox kpi API endpoint, e.g. https://kf.kobotoolbox.org/api/v2 or https://kobo.humanitarianresponse.info/api/v2. debug: Set to True to enable debugging output. Default: False. """ self.token = token self.endpoint = endpoint self.debug = debug pass
[docs] def list_assets(self) -> Dict[str, Any]: """Lists all assets (surveys). Lists all assets (surveys) in the associated KoBoToolbox account. Returns: A dict containing information about your assets. Log into KoBoToolbox and visit https://kf.kobotoolbox.org/api/v2/assets/ to see a description. """ url = f'{self.endpoint}/assets.json' headers = {'Authorization': f'Token {self.token}'} if self.debug: print(f'KoboExtractor.list_assets: Calling {url}') response = requests.get(url, headers=headers) return response.json()
[docs] def get_asset(self, asset_uid: str) -> Dict[str, Any]: """Gets information on an asset (survey). Gets all information on an asset (survey) in the associated KoBoToolbox account. Args: asset_uid: Unique ID of the asset. Obtainable e.g. through ``list_assets()['results'][i]['uid']`` (for your first asset, use ``i=0``). Returns: A dict containing information about your asset. Log into KoBoToolbox and visit https://kf.kobotoolbox.org/api/v2/assets/YOUR_ASSET_UID/ to see a description. """ url = f'{self.endpoint}/assets/{asset_uid}.json' headers = {'Authorization': f'Token {self.token}'} if self.debug: print(f'KoboExtractor.get_asset: Calling {url}') response = requests.get(url , headers=headers) return response.json()
[docs] def get_data(self, asset_uid, # type: str query=None, # type: str start=None, # type: int limit=None, # type: int submitted_after=None, # type: str ): # type: (...) -> Dict[str, Any] """Gets the data (responses) of an asset (survey). Gets all information on an asset (survey) in the associated KoBoToolbox account. Args: asset_uid: Unique ID of the asset. Obtainable e.g. through ``list_assets()['results'][i]['uid']`` (for your first asset, use i=0). query: Query string in the form ``'{"field":"value"}'`` or ``'{"field":{"op": "value"}}'``, e.g. ``'{"_submission_time": {"$gt": "2020-05-14T14:36:20"}}'``. See https://docs.mongodb.com/manual/reference/operator/query/ for operators. start: Index (zero-based) from which the results start (default: 0). limit: Number of results per page (max: 30000, default: 30000). submitted_after: Shorthand to query for submission time. String of date and time in ISO format (e.g. 2020-05-14T14:36:20, results) in query ``'{"_submission_time": {"$gt": "2020-05-14T14:36:20"}}'``. Ignored when combined with 'query'. Returns: A dict containing the data associated with the asset. For a survey asset, the key 'count' provides the number of responses. The key 'results' contains a list of responses. Each response is a dict with several metadata keys (such as '_submission_time') and key/value pairs for each answered question in the form of 'GROUP_CODE/QUESTION_CODE': 'ANSWER_CODE'. Log into KoBoToolbox and visit https://kf.kobotoolbox.org/api/v2/assets/YOUR_ASSET_UID/data/ for a more detailed description. """ if self.debug and query and submitted_after: print("KoboExtractor.get_data(): Ignoring argument " "'submitted_after' because 'query' is specified.") url = f'{self.endpoint}/assets/{asset_uid}/data.json' if query or start or limit or submitted_after: url += '?' if query: url += f'query={query}' elif submitted_after: url += f'query={{"_submission_time": {{"$gt": "{submitted_after}"}}}}' if (query or submitted_after) and (start or limit): url += '&' if start: url += f'start={start}' if limit: url += '&' if limit: url += f'limit={limit}' headers = {'Authorization': f'Token {self.token}'} if self.debug: print(f'KoboExtractor.get_data: Calling {url}') response = requests.get(url, headers=headers) return response.json()
[docs] def get_choices(self, asset, # type: Dict[str, Any] ): # type: (...) -> Dict[str, Dict[str, Dict[str, str]]] """Groups the choices (answer options) of a survey into a dict. Groups all the choices (answer options) of a survey into a dict, arranged by their list. A 'sequence' number is added to allow restoring the original order of the choices from the inherently unordered dict. Args: asset: A dict as returned by ``get_asset()``. Returns: A dict of the form:: { LIST_NAME: { 'label': CHOICE_LABEL, 'sequence': SEQUENCE_NUMBER } } where CHOICE_LABEL is the label (text) of the choice in the survey's default language, and SEQUENCE_NUMBER is an incrementing number that can be used to restore the order of the choices in the survey from this unordered dict. """ choice_lists = {} sequence = 0 for choice in asset['content']['choices']: if choice['list_name'] not in choice_lists: choice_lists[choice['list_name']] = {} if 'label' in choice: label = choice['label'][0] else: label = None choice_lists[choice['list_name']][choice['name']] = { 'label': label, 'sequence': sequence } sequence += 1 return choice_lists
[docs] def get_questions(self, asset, # type: Dict[str, Any] unpack_multiples, # type: bool ): # type: (...) -> Dict[str, Dict[str, Any]] """Groups the choices (answer options) of a survey into a dict. Groups all the choices (answer options) of a survey into a dict, arranged by their list. A 'sequence' number is added to allow restoring the original order of the choices from the inherently unordered dict. Args: asset: A dict as returned by ``get_asset()``. unpack_multiples: If True, the corresponding choices from ``get_choices()`` are added as subsequent questions following a multiple choice question (type 'select_multiple'). The type of these additional questions is set to 'select_multiple_option'. Returns: A dict of the form:: { 'groups': { GROUP_CODE: { 'label': GROUP_LABEL, 'sequence': SEQUENCE_NUMBER, 'repeat': True/False, 'questions': { QUESTION_CODE: { 'type': QUESTION_TYPE, 'sequence': SEQUENCE_NUMBER, 'label': QUESTION_LABEL, 'list_name': CHOICE_LIST_NAME, 'choices': { CHOICE_CODE: { 'label': CHOICE_LABEL, 'type': 'select_multiple_option', 'sequence': SEQUENCE_NUMBER } }, 'other': { 'type': '_or_other', 'label': 'Other', 'sequence': SEQUENCE_NUMBER } } }, 'groups': { GROUP_CODE: { ... } } }, 'questions': { QUESTION_CODE: { ... } } where GROUP_LABEL, QUESTION_LABEL and CHOICE_LABEL are the labels (text) of the group or question in the survey's default language. SEQUENCE_NUMBER is an incrementing number that can be used to restore the order of the questions in the survey from this unordered dict. Depending on the question, not all keys may be present. An additional question of the type '_or_other' is inserted after any question which type ends in '_or_other', to cover the reponses to such questions. """ if unpack_multiples: choices = self.get_choices(asset) sequence = 0 root_group = {} group_levels = [root_group] tmp_group = root_group for qn in asset['content']['survey']: # qn['name'] or qn['$autoname'] is the question code # Assuming every question has a type (so far it has been true) if 'name' in qn: name = qn['name'] elif '$autoname' in qn: name = qn['$autoname'] else: name = None if qn['type'] == 'begin_group' or qn['type'] == 'begin_repeat': # Adding new question groups if 'groups' not in tmp_group: tmp_group['groups'] = {} tmp_group['groups'][name] = {} group_levels.append(tmp_group) tmp_group = tmp_group['groups'][name] if qn['type'] == 'begin_repeat': tmp_group['repeat'] = True else: tmp_group['repeat'] = False if 'label' in qn: tmp_group['label'] = qn['label'][0] tmp_group['sequence'] = sequence sequence += 1 continue # Going one level up after a group ends if qn['type'] == 'end_group' or qn['type'] == 'end_repeat': tmp_group = group_levels.pop() continue # Assuming any other type is a question, assuming every question has a name or $autoname if 'questions' not in tmp_group: tmp_group['questions'] = {} assert name, 'Found question without name nor $autoname!' # Adding new questions to the current group new_question = {} new_question['type'] = qn['type'] new_question['sequence'] = sequence sequence += 1 if 'label' in qn: new_question['label'] = qn['label'][0] if 'select_from_list_name' in qn: new_question['list_name'] = qn['select_from_list_name'] if unpack_multiples and qn['type'] == 'select_multiple': list_name = qn['select_from_list_name'] new_choices = {} sorted_choices = sorted(choices[list_name].items(), key=lambda choice: choice[1]['sequence']) for choice in sorted_choices: new_choices[choice[0]] = {} new_choices[choice[0]]['label'] = choice[1]['label'] new_choices[choice[0]]['type'] = 'select_multiple_option' new_choices[choice[0]]['sequence'] = sequence sequence += 1 new_question['choices'] = new_choices if '_or_other' in qn and qn['_or_other']: # TODO: This needs some testing new_question['other'] = { 'type': '_or_other', 'label': 'Other', 'sequence': sequence } sequence += 1 tmp_group['questions'][name] = new_question return root_group
[docs] def sort_results_by_time(self, unsorted_results, # type: List[Dict[str, Any]] reverse=False, # type: bool ): # type: (...) -> List[Dict[str, Any]] """Sorts an unordered list of responses by their submission time. Sorts a list of responses in random order (e.g. as obtained by ``get_data(asset_uid)['results']`` by the value of their ``_submission_time`` key. Example:: from koboextractor import KoboExtractor kobo = KoboExtractor(KOBO_TOKEN, 'https://kf.kobotoolbox.org/api/v2') assets = kobo.list_assets() asset_uid = assets['results'][0]['uid'] new_data = kobo.get_data(asset_uid) new_results = kobo.sort_results_by_time(new_data['results']) Args: unsorted_results: A list of results as returned by ``kobo.get_data(asset_uid)['results']``. reverse: If True, sort in descending order. Default: False. Returns: A list of results as provided in ``unsorted_results``, but sorted by the value of their ``_submission_time`` key. """ sorted_results = sorted(unsorted_results, key=lambda result: result['_submission_time'], reverse=reverse) return sorted_results
[docs] def label_result(self, unlabeled_result, # type: Dict[str, Any] choice_lists, # type: Dict[str, Dict[str, str]] questions, # type: Dict[str, Dict[str, Any]] unpack_multiples, # type: bool ): # type: (...) -> Dict[str, Any] """Adds labels for questions and answers to a response. Adds labels corresponding the the question group codes, question codes and answer codes to a response. Example: :: from KoboExtractor import KoboExtractor kobo = KoboExtractor(KOBO_TOKEN, 'https://kf.kobotoolbox.org/api/v2') assets = kobo.list_assets() asset_uid = assets['results'][0]['uid'] asset = kobo.get_asset(asset_uid) choice_lists = kobo.get_choices(asset) questions = kobo.get_questions(asset=asset, unpack_multiples=True) asset_data = kobo.get_data(asset_uid) results = kobo.sort_results_by_time(asset_data['results']) labeled_results = [] for result in results: labeled_results.append(kobo.label_result(unlabeled_result=result, choice_lists=choice_lists, questions=questions, unpack_multiples=True)) Args: unlabeled_result: A single result (dict) of the form:: { (GROUP_CODES)/)QUESTION_CODE: ANSWER_CODE, (GROUP_CODE(S)/)REPEAT_GROUP_CODE: [ { (GROUP_CODE(S)/)REPEAT_GROUP_CODE/(GROUP_CODE(S)/)QUESTION_CODE: ANSWER_CODE, (GROUP_CODE(S)/)REPEAT_GROUP_CODE/(GROUP_CODE(S)/)REPEAT_GROUP_CODE: [ ... ] } ], METADATA_KEY: METADATA_VALUE } (e.g. one of the list items in ``get_data(asset_uid)['results']``). choice_lists: Dict of choice lists as returned by ``get_choices(asset)``. questions: Dict of questions as returned by ``get_questions(asset)`` unpack_multiples: If True, the corresponding choices from ``get_choices()`` are added as subsequent questions following a multiple choice question (type 'select_multiple'). Returns: A dict of the form:: { 'meta': { 'start': '2020-05-15T08:07:24.705+08:00', '_version_': 'vf4kqJPWTbsMrZSw5RZQ7H', '_submission_time': '2020-05-15T00:17:51', ... }, results: { (GROUP_CODE(S)/)QUESTION_CODE: { 'label': 'Question label', 'answer_code': ANSWER_CODE, 'answer_label': 'Answer label', 'sequence': QUESTION_SEQUENCE, 'choices': { 'CHOICE_CODE': { 'sequence': CHOICE_SEQUENCE, 'label': CHOICE_LABEL, 'answer_code': 0 or 1, 'answer_label': 'Yes' or 'No' } } }, (GROUP_CODE(S)/)REPEAT_GROUP_CODE: { 0: { (GROUP_CODE(S)/)QUESTION_CODE: { 'label': 'Question label', 'answer_code': ANSWER_CODE, 'answer_label': 'Answer label', 'sequence': QUESTION_SEQUENCE }, (GROUP_CODE(S)/)QUESTION_CODE: { ... }, ... }, 1: { ... } }, ... } } () denote optional parts, depending on how deep the groups are nested. QUESTION_SEQUENCE reflects the order of the questions (and choices) in the survey. """ def label_question(group_codes, question_code, value, questions, choice_lists, unpack_multiples): def unlabeled_question(question_code, value): return { 'label': question_code, 'answer_code': value, 'answer_label': value } # Add and label the question result_qn = {} tmp_qn = questions for group_code in group_codes: if group_code in tmp_qn['groups']: tmp_qn = tmp_qn['groups'][group_code] else: # cannot find this question return unlabeled_question(question_code, value) if not ('questions' in tmp_qn and question_code in tmp_qn['questions']): # cannot find this question return unlabeled_question(question_code, value) tmp_qn = tmp_qn['questions'][question_code] if 'label' not in tmp_qn: # cannot label this question and answer return unlabeled_question(question_code, value) result_qn['sequence'] = tmp_qn['sequence'] result_qn['label'] = tmp_qn['label'] result_qn['answer_code'] = value if tmp_qn['type'] == 'select_one': try: # get answer label from choice list list_name = tmp_qn['list_name'] result_qn['answer_label'] = choice_lists[list_name][value]['label'] except KeyError: # Cannot label this answer result_qn['answer_label'] = value elif tmp_qn['type'] == 'select_multiple': list_name = tmp_qn['list_name'] try: # get individual answer labels from choice list and # concatenate them into this question's answer label answer_label = '' for split_answer_code in value.split(): answer_label = answer_label + choice_lists[list_name][split_answer_code]['label'] + ';' result_qn['answer_label'] = answer_label except KeyError: # Cannot label this answer result_qn['answer_label'] = value if unpack_multiples: # TODO: Should this really be optional? # unpack the individual choices result_qn['choices'] = {} for choice_code, choice_dict in choice_lists[list_name].items(): result_qn['choices'][choice_code] = {} result_qn['choices'][choice_code]['sequence'] = tmp_qn['choices'][choice_code]['sequence'] result_qn['choices'][choice_code]['label'] = choice_dict['label'] result_qn['choices'][choice_code]['answer_code'] = (int) (choice_code in value.split()) result_qn['choices'][choice_code]['answer_label'] = 'Yes' if choice_code in value.split() else 'No' else: # no special treatment for simple types of questions result_qn['answer_label'] = value return result_qn def label_repeat_group(outer_group_codes, repeat_list, questions, choice_lists, unpack_multiples): repeat_group = {} i = 0 for repeat_set in repeat_list: repeat_group[i] = {} for key, value in repeat_set.items(): inner_group_codes = key.split('/') for outer_group_code in outer_group_codes: inner_group_codes.remove(outer_group_code) if isinstance(value, list): # repeat group repeat_group[i]['/'.join(inner_group_codes)] = label_repeat_group(outer_group_codes + inner_group_codes, value, questions, choice_lists, unpack_multiples) else: # (QUESTION_GROUP(S)/)QUESTION_CODE type question_code = inner_group_codes.pop() repeat_group[i]['/'.join(inner_group_codes+[question_code])] = label_question(outer_group_codes + inner_group_codes, question_code, value, questions, choice_lists, unpack_multiples) i += 1 return repeat_group ################################ meta_keys_start = ( '_', 'meta/', 'formhub/', 'simserial', 'phonenumber', 'start', 'end', 'today', 'username', 'deviceid', 'subscriberid' ) meta = {} results = {} for key, value in unlabeled_result.items(): # there are various keys within an unlabeled_result dict, and not # all of them belong to survey questions: # key starts with meta_keys_start -> metadata # otherwise, key points to a list -> (GROUP_CODE(S)/)REPEAT_GROUP_CODE type # otherwise -> (GROUP_CODE(S)/)/QUESTION_CODE type (number of '/' -> number of nested groups) # otherwise -> groupless question if key.startswith(meta_keys_start): meta[key] = value continue if isinstance(value, list): # repeat group group_codes = key.split('/') results['/'.join(group_codes)] = label_repeat_group(group_codes, value, questions, choice_lists, unpack_multiples) else: # (GROUP_CODE(S)/)/QUESTION_CODE type (number of '/' -> number of nested groups) group_codes = key.split('/') question_code = group_codes.pop() results[key] = label_question(group_codes, question_code, value, questions, choice_lists, unpack_multiples) return { 'meta': meta, 'results': results }