a b/tests/autocomplete/language_server.py
1
"""
2
Initialize and provide a thin wrapper around a pyright language server
3
4
The initialization dance was inferred from the source code of the pyright
5
playground client:
6
https://github.com/erictraut/pyright-playground/blob/main/server/src/lspClient.ts
7
8
Another good resource is the specification for the Language Server Protocol:
9
https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification)
10
though it's a bit hard to navigate.
11
"""
12
13
import json
14
import os
15
import re
16
import sys
17
from pathlib import Path
18
from subprocess import PIPE, Popen
19
20
21
class LanguageServer:
22
    def __init__(self, temp_file_path: Path):
23
        self._message_id = 0
24
25
        # The pyright-langserver needs the ehrql repo directory on
26
        # PYTHONPATH so it can understand ehrql, and it needs the current
27
        # location of the python executable (also where pyright-langserver)
28
        # is on its PATH variable
29
        env = os.environ.copy()
30
        env["PYTHONPATH"] = Path("").absolute().as_uri()
31
        env["PATH"] = os.path.dirname(sys.executable)
32
33
        self._language_server = Popen(
34
            ["pyright-langserver", "--stdio"],
35
            stdin=PIPE,
36
            stdout=PIPE,
37
            stderr=PIPE,
38
            env=env,
39
        )
40
        # Server immediately emits two log messages:
41
        #   {'method': 'window/logMessage', 'params': { 'message': 'Pyright language server 1.1.396 starting'}}
42
        #   {'method': 'window/logMessage', 'params': { 'message': 'Server root directory: file:...'}}
43
        self._read_messages(2)
44
45
        # Send an "initialize" message
46
        self._send_message(
47
            "initialize",
48
            {
49
                "processId": os.getpid(),
50
                "rootUri": Path("").absolute().as_uri(),
51
                "capabilities": {
52
                    "textDocument": {
53
                        "hover": {
54
                            # Can also pass in "markdown" in this list as well as "plaintext"
55
                            # markdown is what vscode uses, but for testing purposes it's
56
                            # slightly easier to compare the plain text results
57
                            "contentFormat": ["plaintext"],
58
                        },
59
                    },
60
                },
61
            },
62
        )
63
64
        # Confirm with "initialized" notification
65
        self._send_notification("initialized", {})
66
        # Need to _send these notification as well
67
        self._send_notification("workspace/didChangeConfiguration", {})
68
69
        # Now we create a temp file which we will amend in each test and then
70
        # call the language server to get completions. For reasons I can't quite
71
        # figure out, the initial content needs to have a second, non-empty line
72
        # otherwise some of the tests fail
73
        self.open_doc(temp_file_path, "Line 1\nLine 2")
74
75
        # Server now emits a number of "window/logMessage" messages (was 6 prior to
76
        # pyright v1.1.393, but is now 7), finishing with a
77
        # "textDocument/publishDiagnostics" message. So we read the messages until
78
        # we get to that one.
79
        self._read_until_specific_method("textDocument/publishDiagnostics")
80
81
        # The server is now ready for completion and hover requests
82
83
    def _next_id(self):
84
        self._message_id += 1
85
        return self._message_id
86
87
    def get_completion_results_from_file(self, line, cursor_position):
88
        """
89
        For a given line and cursor_position returns the list of potential
90
        completion results. This assumes that the file is already loaded by
91
        the language server (via `open_doc`). If you want to test a single line
92
        of code, then use the `get_completion_results()` method instead
93
        """
94
        completion_response = self._get_completion(line, cursor_position)
95
        results = completion_response.get("result")
96
        items = results.get("items")
97
        return items
98
99
    def get_completion_results(
100
        self, text_for_completion, cursor_position=None
101
    ):  # pragma: no cover
102
        """
103
        For a given string of text provide the list of potential completion
104
        results. If cursor_position is omitted, then it looks for completion
105
        at the end of the text provided.
106
107
        To use, you should provide the entire file contents up to the point
108
        that you want to check autocomplete. It should all be on a single
109
        line, with ';' separators.
110
111
        Returns a list of completion items which look like this:
112
        { "label": str, "kind": CompletionKind, "sortText": str}
113
        """
114
        self._notify_document_change(0, text_for_completion)
115
116
        if cursor_position is None:
117
            cursor_position = len(text_for_completion)
118
119
        return self.get_completion_results_from_file(0, cursor_position)
120
121
    def get_element_type_from_file(self, line, cursor_position):
122
        """
123
        For a given line and cursor_position returns the type of the currently
124
        hovered piece of text. This assumes that the file is already loaded by
125
        the language server (via `open_doc`). If you want to test a single line
126
        of code, then use the `get_element_type()` method instead
127
        """
128
        hover_response = self._get_hover_text(line, cursor_position)
129
        value = hover_response.get("result").get("contents").get("value")
130
        first_line = value.split("\n\n")[0]
131
132
        # First line contains the signature like `(variable) name: type`
133
        variable_signature = re.search(
134
            "^\\((?:variable|property)\\) (?P<var_name>[^:]+): (?P<type>.+)",
135
            first_line,
136
        )
137
138
        # First line contains the signature like `(method) def method_name() -> type`
139
        method_signature = re.search(
140
            "^\\(method\\) def (?P<method_name>[^(]+)\\([^()]*\\) -> (?P<type>.+)",
141
            first_line,
142
        )
143
144
        if variable_signature:
145
            thing_type = variable_signature.group("type")
146
        elif method_signature:
147
            thing_type = method_signature.group("type")
148
        else:  # pragma: no cover
149
            assert 0, (
150
                f"The type signature `{value}` could not be parsed by self._language_server.py."
151
            )
152
153
        return thing_type
154
155
    def get_element_type(self, text, cursor_position=None):  # pragma: no cover
156
        """
157
        For a given string of text provide the inferred type of the item at the
158
        current cursor_position. If cursor_position is omitted, it assumes the
159
        thing to check is the last thing typed and so looks at the cursor position
160
        just before the end of the string.
161
162
        To use, you should provide the entire file contents up to the point
163
        that you want to get the type. It should all be on a single
164
        line, with ';' separators.
165
        """
166
        self._notify_document_change(0, f"{text}\n")
167
168
        if cursor_position is None:
169
            cursor_position = len(text) - 1
170
171
        return self.get_element_type_from_file(0, cursor_position)
172
173
    def _notify_document_change(self, line_number, new_text):  # pragma: no cover
174
        # Need to update text_document version
175
        self._text_document["version"] = f"v{self._next_id()}"
176
        self._send_notification(
177
            "textDocument/didChange",
178
            {
179
                "textDocument": self._text_document,
180
                "contentChanges": [
181
                    {
182
                        "range": {
183
                            "start": {"line": line_number, "character": 0},
184
                            "end": {
185
                                "line": line_number,
186
                                "character": len(new_text),
187
                            },
188
                        },
189
                        "text": new_text,
190
                    }
191
                ],
192
            },
193
        )
194
        # This notification causes a response to be sent, so we
195
        # need to read it to clear it from stdout
196
        self._read_message()
197
198
    def _get_completion(self, line_number, character):
199
        """
200
        Get the list of completion objects for a given position
201
        """
202
        return self._send_message(
203
            "textDocument/completion",
204
            {
205
                "textDocument": self._text_document,
206
                "position": {"line": line_number, "character": character},
207
            },
208
        )
209
210
    def _get_hover_text(self, line_number, character):
211
        """
212
        Get the inferred type
213
        """
214
        return self._send_message(
215
            "textDocument/hover",
216
            {
217
                "textDocument": self._text_document,
218
                "position": {"line": line_number, "character": character},
219
            },
220
        )
221
222
    def _read_message(self):
223
        """
224
        Read a message from the language server.
225
226
        Message format is a header like "Content-Length: xxx", followed by
227
        \r\n\r\n then the message in JSON RPC.
228
        """
229
        line = self._language_server.stdout.readline().decode("utf-8").strip()
230
        while not line.startswith("Content-Length:"):  # pragma: no cover
231
            line = self._language_server.stdout.readline().decode("utf-8").strip()
232
        content_length = int(line.split(":")[1].strip())
233
        content = self._language_server.stdout.read(content_length + 2).decode("utf-8")
234
        return json.loads(content)
235
236
    def _read_messages(self, number_of_messages=1):
237
        """Read multiple messages from the language server"""
238
        messages = []
239
        while number_of_messages > 0:
240
            messages.append(self._read_message())
241
            number_of_messages -= 1
242
        return messages
243
244
    def _read_until_specific_method(self, method):
245
        """Reads messages from the server until we get the one we're looking for"""
246
        messages = [self._read_message()]
247
        while messages[-1]["method"] != method:
248
            messages.append(self._read_message())
249
        return messages
250
251
    def _send(self, message):
252
        content = json.dumps(message)
253
        message_str = f"Content-Length: {str(len(content))}\r\n\r\n{content}"
254
        self._language_server.stdin.write(message_str.encode())
255
        self._language_server.stdin.flush()
256
257
    def _send_notification(self, method, params):
258
        """Send a notification to the language server - there is no response."""
259
        notification = {
260
            "jsonrpc": "2.0",
261
            "method": method,
262
            "params": params,
263
        }
264
        self._send(notification)
265
266
    def _send_message(self, method, params):
267
        """Send a message to the language server and return the response."""
268
        message = {
269
            "jsonrpc": "2.0",
270
            "id": self._next_id(),
271
            "method": method,
272
            "params": params,
273
        }
274
        self._send(message)
275
        return self._read_message()
276
277
    def open_doc(self, temp_file_path, content):
278
        """
279
        Tell the language server about a file we have "opened" so that
280
        it can parse it to make the autocomplete responses faster
281
        """
282
        temp_file_path.write_text(content, encoding="utf-8")
283
        temp_file_uri = temp_file_path.absolute().as_uri()
284
285
        self._text_document = {
286
            "uri": temp_file_uri,
287
            "languageId": "python",
288
            "version": 1,
289
            "text": content,
290
        }
291
        self._send_notification(
292
            "textDocument/didOpen",
293
            {"textDocument": self._text_document},
294
        )
295
296
        self._read_message()