|
a |
|
b/tests/autocomplete/language_server.py |
|
|
1 |
""" |
|
|
2 |
Initialize and provide a thin wrapper around a pyright language server |
|
|
3 |
|
|
|
4 |
The initialization dance was inferred from the source code of the pyright |
|
|
5 |
playground client: |
|
|
6 |
https://github.com/erictraut/pyright-playground/blob/main/server/src/lspClient.ts |
|
|
7 |
|
|
|
8 |
Another good resource is the specification for the Language Server Protocol: |
|
|
9 |
https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification) |
|
|
10 |
though it's a bit hard to navigate. |
|
|
11 |
""" |
|
|
12 |
|
|
|
13 |
import json |
|
|
14 |
import os |
|
|
15 |
import re |
|
|
16 |
import sys |
|
|
17 |
from pathlib import Path |
|
|
18 |
from subprocess import PIPE, Popen |
|
|
19 |
|
|
|
20 |
|
|
|
21 |
class LanguageServer: |
|
|
22 |
def __init__(self, temp_file_path: Path): |
|
|
23 |
self._message_id = 0 |
|
|
24 |
|
|
|
25 |
# The pyright-langserver needs the ehrql repo directory on |
|
|
26 |
# PYTHONPATH so it can understand ehrql, and it needs the current |
|
|
27 |
# location of the python executable (also where pyright-langserver) |
|
|
28 |
# is on its PATH variable |
|
|
29 |
env = os.environ.copy() |
|
|
30 |
env["PYTHONPATH"] = Path("").absolute().as_uri() |
|
|
31 |
env["PATH"] = os.path.dirname(sys.executable) |
|
|
32 |
|
|
|
33 |
self._language_server = Popen( |
|
|
34 |
["pyright-langserver", "--stdio"], |
|
|
35 |
stdin=PIPE, |
|
|
36 |
stdout=PIPE, |
|
|
37 |
stderr=PIPE, |
|
|
38 |
env=env, |
|
|
39 |
) |
|
|
40 |
# Server immediately emits two log messages: |
|
|
41 |
# {'method': 'window/logMessage', 'params': { 'message': 'Pyright language server 1.1.396 starting'}} |
|
|
42 |
# {'method': 'window/logMessage', 'params': { 'message': 'Server root directory: file:...'}} |
|
|
43 |
self._read_messages(2) |
|
|
44 |
|
|
|
45 |
# Send an "initialize" message |
|
|
46 |
self._send_message( |
|
|
47 |
"initialize", |
|
|
48 |
{ |
|
|
49 |
"processId": os.getpid(), |
|
|
50 |
"rootUri": Path("").absolute().as_uri(), |
|
|
51 |
"capabilities": { |
|
|
52 |
"textDocument": { |
|
|
53 |
"hover": { |
|
|
54 |
# Can also pass in "markdown" in this list as well as "plaintext" |
|
|
55 |
# markdown is what vscode uses, but for testing purposes it's |
|
|
56 |
# slightly easier to compare the plain text results |
|
|
57 |
"contentFormat": ["plaintext"], |
|
|
58 |
}, |
|
|
59 |
}, |
|
|
60 |
}, |
|
|
61 |
}, |
|
|
62 |
) |
|
|
63 |
|
|
|
64 |
# Confirm with "initialized" notification |
|
|
65 |
self._send_notification("initialized", {}) |
|
|
66 |
# Need to _send these notification as well |
|
|
67 |
self._send_notification("workspace/didChangeConfiguration", {}) |
|
|
68 |
|
|
|
69 |
# Now we create a temp file which we will amend in each test and then |
|
|
70 |
# call the language server to get completions. For reasons I can't quite |
|
|
71 |
# figure out, the initial content needs to have a second, non-empty line |
|
|
72 |
# otherwise some of the tests fail |
|
|
73 |
self.open_doc(temp_file_path, "Line 1\nLine 2") |
|
|
74 |
|
|
|
75 |
# Server now emits a number of "window/logMessage" messages (was 6 prior to |
|
|
76 |
# pyright v1.1.393, but is now 7), finishing with a |
|
|
77 |
# "textDocument/publishDiagnostics" message. So we read the messages until |
|
|
78 |
# we get to that one. |
|
|
79 |
self._read_until_specific_method("textDocument/publishDiagnostics") |
|
|
80 |
|
|
|
81 |
# The server is now ready for completion and hover requests |
|
|
82 |
|
|
|
83 |
def _next_id(self): |
|
|
84 |
self._message_id += 1 |
|
|
85 |
return self._message_id |
|
|
86 |
|
|
|
87 |
def get_completion_results_from_file(self, line, cursor_position): |
|
|
88 |
""" |
|
|
89 |
For a given line and cursor_position returns the list of potential |
|
|
90 |
completion results. This assumes that the file is already loaded by |
|
|
91 |
the language server (via `open_doc`). If you want to test a single line |
|
|
92 |
of code, then use the `get_completion_results()` method instead |
|
|
93 |
""" |
|
|
94 |
completion_response = self._get_completion(line, cursor_position) |
|
|
95 |
results = completion_response.get("result") |
|
|
96 |
items = results.get("items") |
|
|
97 |
return items |
|
|
98 |
|
|
|
99 |
def get_completion_results( |
|
|
100 |
self, text_for_completion, cursor_position=None |
|
|
101 |
): # pragma: no cover |
|
|
102 |
""" |
|
|
103 |
For a given string of text provide the list of potential completion |
|
|
104 |
results. If cursor_position is omitted, then it looks for completion |
|
|
105 |
at the end of the text provided. |
|
|
106 |
|
|
|
107 |
To use, you should provide the entire file contents up to the point |
|
|
108 |
that you want to check autocomplete. It should all be on a single |
|
|
109 |
line, with ';' separators. |
|
|
110 |
|
|
|
111 |
Returns a list of completion items which look like this: |
|
|
112 |
{ "label": str, "kind": CompletionKind, "sortText": str} |
|
|
113 |
""" |
|
|
114 |
self._notify_document_change(0, text_for_completion) |
|
|
115 |
|
|
|
116 |
if cursor_position is None: |
|
|
117 |
cursor_position = len(text_for_completion) |
|
|
118 |
|
|
|
119 |
return self.get_completion_results_from_file(0, cursor_position) |
|
|
120 |
|
|
|
121 |
def get_element_type_from_file(self, line, cursor_position): |
|
|
122 |
""" |
|
|
123 |
For a given line and cursor_position returns the type of the currently |
|
|
124 |
hovered piece of text. This assumes that the file is already loaded by |
|
|
125 |
the language server (via `open_doc`). If you want to test a single line |
|
|
126 |
of code, then use the `get_element_type()` method instead |
|
|
127 |
""" |
|
|
128 |
hover_response = self._get_hover_text(line, cursor_position) |
|
|
129 |
value = hover_response.get("result").get("contents").get("value") |
|
|
130 |
first_line = value.split("\n\n")[0] |
|
|
131 |
|
|
|
132 |
# First line contains the signature like `(variable) name: type` |
|
|
133 |
variable_signature = re.search( |
|
|
134 |
"^\\((?:variable|property)\\) (?P<var_name>[^:]+): (?P<type>.+)", |
|
|
135 |
first_line, |
|
|
136 |
) |
|
|
137 |
|
|
|
138 |
# First line contains the signature like `(method) def method_name() -> type` |
|
|
139 |
method_signature = re.search( |
|
|
140 |
"^\\(method\\) def (?P<method_name>[^(]+)\\([^()]*\\) -> (?P<type>.+)", |
|
|
141 |
first_line, |
|
|
142 |
) |
|
|
143 |
|
|
|
144 |
if variable_signature: |
|
|
145 |
thing_type = variable_signature.group("type") |
|
|
146 |
elif method_signature: |
|
|
147 |
thing_type = method_signature.group("type") |
|
|
148 |
else: # pragma: no cover |
|
|
149 |
assert 0, ( |
|
|
150 |
f"The type signature `{value}` could not be parsed by self._language_server.py." |
|
|
151 |
) |
|
|
152 |
|
|
|
153 |
return thing_type |
|
|
154 |
|
|
|
155 |
def get_element_type(self, text, cursor_position=None): # pragma: no cover |
|
|
156 |
""" |
|
|
157 |
For a given string of text provide the inferred type of the item at the |
|
|
158 |
current cursor_position. If cursor_position is omitted, it assumes the |
|
|
159 |
thing to check is the last thing typed and so looks at the cursor position |
|
|
160 |
just before the end of the string. |
|
|
161 |
|
|
|
162 |
To use, you should provide the entire file contents up to the point |
|
|
163 |
that you want to get the type. It should all be on a single |
|
|
164 |
line, with ';' separators. |
|
|
165 |
""" |
|
|
166 |
self._notify_document_change(0, f"{text}\n") |
|
|
167 |
|
|
|
168 |
if cursor_position is None: |
|
|
169 |
cursor_position = len(text) - 1 |
|
|
170 |
|
|
|
171 |
return self.get_element_type_from_file(0, cursor_position) |
|
|
172 |
|
|
|
173 |
def _notify_document_change(self, line_number, new_text): # pragma: no cover |
|
|
174 |
# Need to update text_document version |
|
|
175 |
self._text_document["version"] = f"v{self._next_id()}" |
|
|
176 |
self._send_notification( |
|
|
177 |
"textDocument/didChange", |
|
|
178 |
{ |
|
|
179 |
"textDocument": self._text_document, |
|
|
180 |
"contentChanges": [ |
|
|
181 |
{ |
|
|
182 |
"range": { |
|
|
183 |
"start": {"line": line_number, "character": 0}, |
|
|
184 |
"end": { |
|
|
185 |
"line": line_number, |
|
|
186 |
"character": len(new_text), |
|
|
187 |
}, |
|
|
188 |
}, |
|
|
189 |
"text": new_text, |
|
|
190 |
} |
|
|
191 |
], |
|
|
192 |
}, |
|
|
193 |
) |
|
|
194 |
# This notification causes a response to be sent, so we |
|
|
195 |
# need to read it to clear it from stdout |
|
|
196 |
self._read_message() |
|
|
197 |
|
|
|
198 |
def _get_completion(self, line_number, character): |
|
|
199 |
""" |
|
|
200 |
Get the list of completion objects for a given position |
|
|
201 |
""" |
|
|
202 |
return self._send_message( |
|
|
203 |
"textDocument/completion", |
|
|
204 |
{ |
|
|
205 |
"textDocument": self._text_document, |
|
|
206 |
"position": {"line": line_number, "character": character}, |
|
|
207 |
}, |
|
|
208 |
) |
|
|
209 |
|
|
|
210 |
def _get_hover_text(self, line_number, character): |
|
|
211 |
""" |
|
|
212 |
Get the inferred type |
|
|
213 |
""" |
|
|
214 |
return self._send_message( |
|
|
215 |
"textDocument/hover", |
|
|
216 |
{ |
|
|
217 |
"textDocument": self._text_document, |
|
|
218 |
"position": {"line": line_number, "character": character}, |
|
|
219 |
}, |
|
|
220 |
) |
|
|
221 |
|
|
|
222 |
def _read_message(self): |
|
|
223 |
""" |
|
|
224 |
Read a message from the language server. |
|
|
225 |
|
|
|
226 |
Message format is a header like "Content-Length: xxx", followed by |
|
|
227 |
\r\n\r\n then the message in JSON RPC. |
|
|
228 |
""" |
|
|
229 |
line = self._language_server.stdout.readline().decode("utf-8").strip() |
|
|
230 |
while not line.startswith("Content-Length:"): # pragma: no cover |
|
|
231 |
line = self._language_server.stdout.readline().decode("utf-8").strip() |
|
|
232 |
content_length = int(line.split(":")[1].strip()) |
|
|
233 |
content = self._language_server.stdout.read(content_length + 2).decode("utf-8") |
|
|
234 |
return json.loads(content) |
|
|
235 |
|
|
|
236 |
def _read_messages(self, number_of_messages=1): |
|
|
237 |
"""Read multiple messages from the language server""" |
|
|
238 |
messages = [] |
|
|
239 |
while number_of_messages > 0: |
|
|
240 |
messages.append(self._read_message()) |
|
|
241 |
number_of_messages -= 1 |
|
|
242 |
return messages |
|
|
243 |
|
|
|
244 |
def _read_until_specific_method(self, method): |
|
|
245 |
"""Reads messages from the server until we get the one we're looking for""" |
|
|
246 |
messages = [self._read_message()] |
|
|
247 |
while messages[-1]["method"] != method: |
|
|
248 |
messages.append(self._read_message()) |
|
|
249 |
return messages |
|
|
250 |
|
|
|
251 |
def _send(self, message): |
|
|
252 |
content = json.dumps(message) |
|
|
253 |
message_str = f"Content-Length: {str(len(content))}\r\n\r\n{content}" |
|
|
254 |
self._language_server.stdin.write(message_str.encode()) |
|
|
255 |
self._language_server.stdin.flush() |
|
|
256 |
|
|
|
257 |
def _send_notification(self, method, params): |
|
|
258 |
"""Send a notification to the language server - there is no response.""" |
|
|
259 |
notification = { |
|
|
260 |
"jsonrpc": "2.0", |
|
|
261 |
"method": method, |
|
|
262 |
"params": params, |
|
|
263 |
} |
|
|
264 |
self._send(notification) |
|
|
265 |
|
|
|
266 |
def _send_message(self, method, params): |
|
|
267 |
"""Send a message to the language server and return the response.""" |
|
|
268 |
message = { |
|
|
269 |
"jsonrpc": "2.0", |
|
|
270 |
"id": self._next_id(), |
|
|
271 |
"method": method, |
|
|
272 |
"params": params, |
|
|
273 |
} |
|
|
274 |
self._send(message) |
|
|
275 |
return self._read_message() |
|
|
276 |
|
|
|
277 |
def open_doc(self, temp_file_path, content): |
|
|
278 |
""" |
|
|
279 |
Tell the language server about a file we have "opened" so that |
|
|
280 |
it can parse it to make the autocomplete responses faster |
|
|
281 |
""" |
|
|
282 |
temp_file_path.write_text(content, encoding="utf-8") |
|
|
283 |
temp_file_uri = temp_file_path.absolute().as_uri() |
|
|
284 |
|
|
|
285 |
self._text_document = { |
|
|
286 |
"uri": temp_file_uri, |
|
|
287 |
"languageId": "python", |
|
|
288 |
"version": 1, |
|
|
289 |
"text": content, |
|
|
290 |
} |
|
|
291 |
self._send_notification( |
|
|
292 |
"textDocument/didOpen", |
|
|
293 |
{"textDocument": self._text_document}, |
|
|
294 |
) |
|
|
295 |
|
|
|
296 |
self._read_message() |