[9d3784]: / aiagents4pharma / talk2scholars / tools / zotero / utils / zotero_path.py

Download this file

189 lines (149 with data), 6.4 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#!/usr/bin/env python3
"""
Utility functions for Zotero path operations.
"""
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# pylint: disable=broad-exception-caught
def get_item_collections(zot):
"""
Fetch all Zotero collections and map item keys to their full collection paths.
Args:
zot (Zotero): An initialized Zotero client.
Returns:
dict: A dictionary mapping item keys to a list of full collection paths.
"""
logger.info("Fetching Zotero collections...")
# Fetch all collections
collections = zot.collections()
# Create mappings: collection key → name and collection key → parent key
collection_map = {col["key"]: col["data"]["name"] for col in collections}
parent_map = {
col["key"]: col["data"].get("parentCollection") for col in collections
}
# Build full paths for collections
def build_collection_path(col_key):
"""build collection path from collection key"""
path = []
while col_key:
path.insert(0, collection_map.get(col_key, "Unknown"))
col_key = parent_map.get(col_key)
return "/" + "/".join(path) # Convert to "/path/to/collection"
collection_paths = {key: build_collection_path(key) for key in collection_map}
# Manually create an item-to-collection mapping with full paths
item_to_collections = {}
for collection in collections:
collection_key = collection["key"]
collection_items = zot.collection_items(
collection_key
) # Fetch items in the collection
for item in collection_items:
item_key = item["data"]["key"]
if item_key in item_to_collections:
item_to_collections[item_key].append(collection_paths[collection_key])
else:
item_to_collections[item_key] = [collection_paths[collection_key]]
logger.info("Successfully mapped items to collection paths.")
return item_to_collections
def find_or_create_collection(zot, path, create_missing=False):
"""find collection or create if missing"""
logger.info(
"Finding collection for path: %s (create_missing=%s)", path, create_missing
)
# Normalize path: remove leading/trailing slashes and convert to lowercase
normalized = path.strip("/").lower()
path_parts = normalized.split("/") if normalized else []
if not path_parts:
logger.warning("Empty path provided")
return None
# Get all collections from Zotero
all_collections = zot.collections()
logger.info("Found %d collections in Zotero", len(all_collections))
# Determine target name (last part) and, if nested, find the parent's key
target_name = path_parts[-1]
parent_key = None
if len(path_parts) > 1:
parent_name = path_parts[-2]
# Look for a collection with name matching the parent (case-insensitive)
for col in all_collections:
if col["data"]["name"].lower() == parent_name:
parent_key = col["key"]
break
# Try to find an existing collection by direct match (ignoring hierarchy)
for col in all_collections:
if col["data"]["name"].lower() == target_name:
logger.info("Found direct match for %s: %s", target_name, col["key"])
return col["key"]
# No match found: create one if allowed
if create_missing:
payload = {"name": target_name}
if parent_key:
payload["parentCollection"] = parent_key
try:
result = zot.create_collection(payload)
# Interpret result based on structure
if "success" in result:
new_key = result["success"]["0"]
else:
new_key = result["successful"]["0"]["data"]["key"]
logger.info("Created collection %s with key %s", target_name, new_key)
return new_key
except Exception as e:
logger.error("Failed to create collection: %s", e)
return None
else:
logger.warning("No matching collection found for %s", target_name)
return None
def get_all_collection_paths(zot):
"""
Get all available collection paths in Zotero.
Args:
zot (Zotero): An initialized Zotero client.
Returns:
list: List of all available collection paths
"""
logger.info("Getting all collection paths")
collections = zot.collections()
# Create mappings: collection key → name and collection key → parent key
collection_map = {col["key"]: col["data"]["name"] for col in collections}
parent_map = {
col["key"]: col["data"].get("parentCollection") for col in collections
}
# Build full paths for collections
def build_collection_path(col_key):
path = []
while col_key:
path.insert(0, collection_map.get(col_key, "Unknown"))
col_key = parent_map.get(col_key)
return "/" + "/".join(path)
collection_paths = [build_collection_path(key) for key in collection_map]
logger.info("Found %d collection paths", len(collection_paths))
return collection_paths
def fetch_papers_for_save(state):
"""
Retrieve papers from the state for saving to Zotero.
Args:
state (dict): The state containing previously fetched papers.
Returns:
dict: Dictionary of papers to save, or None if no papers found
"""
logger.info("Fetching papers from state for saving")
# Retrieve last displayed papers from the agent state
last_displayed_key = state.get("last_displayed_papers", "")
if not last_displayed_key:
logger.warning("No last_displayed_papers key in state")
return None
if isinstance(last_displayed_key, str):
# If it's a string (key to another state object), get that object
fetched_papers = state.get(last_displayed_key, {})
logger.info("Using papers from '%s' state key", last_displayed_key)
else:
# If it's already the papers object
fetched_papers = last_displayed_key
logger.info("Using papers directly from last_displayed_papers")
if not fetched_papers:
logger.warning("No fetched papers found to save.")
return None
return fetched_papers