Skip to content

Commit 32ab91e

Browse files
committed
resolver
1 parent 259402a commit 32ab91e

File tree

5 files changed

+624
-6
lines changed

5 files changed

+624
-6
lines changed

etc/docker-compose-2-members.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ services:
1919
- "9612:9612"
2020
- "8080:8080"
2121
- "6676:6676"
22+
- "7574:7574"
2223
volumes:
2324
- .:/args
2425
- ./cert:/certs

src/nslookup.ts

Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
/*
2+
* Copyright (c) 2025, Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Universal Permissive License v 1.0 as shown at
5+
* https://oss.oracle.com/licenses/upl.
6+
*/
7+
8+
import * as net from 'net';
9+
import { promisify } from 'util';
10+
import { randomBytes } from 'crypto';
11+
12+
// Constants
13+
const DEFAULT_PORT = 7574;
14+
const DEFAULT_HOST = 'localhost';
15+
const CLUSTER_NAME_LOOKUP = 'Cluster/name';
16+
const CLUSTER_INFO_LOOKUP = 'Cluster/info';
17+
const CLUSTER_FOREIGN_LOOKUP = 'Cluster/foreign';
18+
const MANAGEMENT_LOOKUP = 'management/HTTPManagementURL';
19+
const JMX_LOOKUP = 'management/JMXServiceURL';
20+
const METRICS_LOOKUP = 'metrics/HTTPMetricsURL';
21+
const GRPC_PROXY_LOOKUP = '$GRPC:GrpcProxy';
22+
const NS_PREFIX = 'NameService/string/';
23+
const NS_LOCAL_PORT = '/NameService/localPort';
24+
const DEFAULT_TIMEOUT = 10;
25+
26+
const MULTIPLEXED_SOCKET = Buffer.from([90, 193, 224, 0]);
27+
const NAME_SERVICE_SUB_PORT = Buffer.from([0, 0, 0, 3]);
28+
const CONNECTION_OPEN = Buffer.from([
29+
0, 1, 2, 0, 66, 0, 1, 14, 0, 0, 66, 166, 182, 159, 222, 178, 81,
30+
1, 65, 227, 243, 228, 221, 15, 2, 65, 143, 246, 186, 153, 1, 3,
31+
65, 248, 180, 229, 242, 4, 4, 65, 196, 254, 220, 245, 5, 5, 65, 215,
32+
206, 195, 141, 7, 6, 65, 219, 137, 220, 213, 10, 64, 2, 110, 3,
33+
93, 78, 87, 2, 17, 77, 101, 115, 115, 97, 103, 105, 110, 103, 80,
34+
114, 111, 116, 111, 99, 111, 108, 2, 65, 2, 65, 2, 19, 78, 97, 109,
35+
101, 83, 101, 114, 118, 105, 99, 101, 80, 114, 111, 116, 111, 99,
36+
111, 108, 2, 65, 1, 65, 1, 5, 160, 2, 0, 0, 14, 0, 0, 66, 174, 137,
37+
158, 222, 178, 81, 1, 65, 129, 128, 128, 240, 15, 5, 65, 152, 159,
38+
129, 128, 8, 6, 65, 147, 158, 1, 64, 1, 106, 2, 110, 3, 106, 4, 113,
39+
5, 113, 6, 78, 8, 67, 108, 117, 115, 116, 101, 114, 66, 9, 78, 9, 108,
40+
111, 99, 97, 108, 104, 111, 115, 116, 10, 78, 5, 50, 48, 50, 51, 51, 12,
41+
78, 16, 67, 111, 104, 101, 114, 101, 110, 99, 101, 67, 111, 110, 115,
42+
111, 108, 101, 64, 64,
43+
]);
44+
45+
const CHANNEL_OPEN = Buffer.from([
46+
0, 11, 2, 0, 66, 1, 1, 78, 19, 78, 97, 109, 101, 83, 101, 114, 118,
47+
105, 99, 101, 80, 114, 111, 116, 111, 99, 111, 108, 2, 78, 11, 78,
48+
97, 109, 101, 83, 101, 114, 118, 105, 99, 101, 64,
49+
]);
50+
51+
const NS_LOOKUP_REQ_ID = Buffer.from([1, 1, 0, 66, 0, 1, 78]);
52+
const REQ_END_MARKER = Buffer.from([64]);
53+
54+
function writePackedInt(n: number): Buffer {
55+
let result: Buffer = Buffer.alloc(0);
56+
let b = 0;
57+
58+
if (n < 0) {
59+
b = 0x40;
60+
n = ~n; // bitwise negation
61+
}
62+
63+
b |= n & 0x3F;
64+
n >>= 6;
65+
66+
while (n !== 0) {
67+
result = Buffer.concat([result, Buffer.from([b | 0x80])]);
68+
b = n & 0x7F;
69+
n >>= 7;
70+
}
71+
72+
result = Buffer.concat([result, Buffer.from([b])]);
73+
return result;
74+
}
75+
76+
77+
// Helper function to parse results from a string response
78+
function parseResults(results: string): string[] {
79+
if (results.charAt(0) == '[' && results.charAt(results.length -1 ) == ']')
80+
results = results.trim().slice(1, -1); // Remove brackets
81+
return results ? results.split(', ') : [];
82+
}
83+
84+
function sleep(ms: number) {
85+
return new Promise((resolve) => {
86+
setTimeout(resolve, ms);
87+
});
88+
}
89+
90+
// Class representing a discovered cluster
91+
class DiscoveredCluster {
92+
cluster_name: string = '';
93+
connection_name: string = '';
94+
ns_port: number = 0;
95+
host: string = '';
96+
management_urls: string[] = [];
97+
selected_url: string = '';
98+
metrics_urls: string[] = [];
99+
jmx_urls: string[] = [];
100+
grpc_proxy_endpoints: string[] = [];
101+
}
102+
103+
// Class representing a ClusterNSPort
104+
class ClusterNSPort {
105+
host_name: string = '';
106+
cluster_name: string = '';
107+
port: number = 0;
108+
is_local: boolean = false;
109+
110+
constructor(cluster_name = '', port = 0, host_name = '', is_local = false) {
111+
this.host_name = host_name;
112+
this.cluster_name = cluster_name;
113+
this.port = port;
114+
this.is_local = is_local;
115+
}
116+
}
117+
118+
// Class handling the asynchronous network operations
119+
class AsyncNSLookup {
120+
host: string;
121+
port: number;
122+
channel: Buffer;
123+
socket: net.Socket | null;
124+
reader: net.Socket | null;
125+
writer: net.Socket | null;
126+
127+
constructor(host: string = DEFAULT_HOST, port: number = DEFAULT_PORT) {
128+
this.host = host;
129+
this.port = port;
130+
this.channel = Buffer.alloc(0);
131+
this.socket = null;
132+
this.reader = null;
133+
this.writer = null;
134+
}
135+
136+
// Static function to open a connection
137+
static async open(hostPort: string = '', timeout: number = DEFAULT_TIMEOUT): Promise<AsyncNSLookup> {
138+
const nslookup = new AsyncNSLookup();
139+
if (hostPort) {
140+
const [host, port] = hostPort.split(':');
141+
nslookup.host = host;
142+
nslookup.port = port ? parseInt(port) : DEFAULT_PORT;
143+
}
144+
await nslookup.connect();
145+
return nslookup;
146+
}
147+
148+
// Connect to the server
149+
async connect(): Promise<void> {
150+
this.socket = new net.Socket();
151+
this.socket.setNoDelay(true)
152+
this.socket.setTimeout(DEFAULT_TIMEOUT * 1000); // Set timeout
153+
this.socket.connect(this.port, this.host)
154+
155+
this.writer = this.socket;
156+
this.reader = this.socket;
157+
158+
this.writer.write(MULTIPLEXED_SOCKET);
159+
this.writer.write(NAME_SERVICE_SUB_PORT);
160+
this.writer.write(writePackedInt(CONNECTION_OPEN.length));
161+
this.writer.write(CONNECTION_OPEN);
162+
this.writer.write(writePackedInt(CHANNEL_OPEN.length));
163+
this.writer.write(CHANNEL_OPEN);
164+
165+
await sleep(1000)
166+
167+
await this.readResponse()
168+
const data = await this.readResponse();
169+
170+
this.channel = data.slice(8, 8 + data.length - 9);
171+
}
172+
173+
// Perform a lookup
174+
async lookup(name: string): Promise<string> {
175+
const response = await this.lookupInternal(name);
176+
if (response.length <= 7) {
177+
return '';
178+
}
179+
return this.readString(response);
180+
}
181+
182+
// Internal lookup function
183+
private async lookupInternal(name: string): Promise<Buffer> {
184+
const request = Buffer.concat([
185+
this.channel,
186+
NS_LOOKUP_REQ_ID,
187+
writePackedInt(name.length),
188+
Buffer.from(name),
189+
REQ_END_MARKER,
190+
]);
191+
192+
this.writer?.write(writePackedInt(request.length));
193+
this.writer?.write(request);
194+
//await this.writer?.flush();
195+
196+
await sleep(1000)
197+
198+
const response = await this.readResponse();
199+
console.log("### DEBUG DATA -> " + response.toString())
200+
return response.slice(this.channel.length + 1);
201+
}
202+
203+
// Read response from the socket
204+
private async readResponse(): Promise<Buffer> {
205+
const length = await this.readPackedInt();
206+
const data: any = this.reader?.read(length)
207+
//const data = await promisify(this.reader?.read.bind(this.reader))(length);
208+
return data!;
209+
}
210+
211+
// Read packed integer
212+
private async readPackedInt(): Promise<number> {
213+
let bits = 6;
214+
215+
let byte = await this.reader?.read(1);
216+
let byteValue = byte ? byte[0] : 0;
217+
let negative: boolean = (byteValue & 0x40) !== 0
218+
let result = (byteValue & 0x3F)
219+
220+
while ((byteValue & 0x80) !== 0) {
221+
byte = await this.reader?.read(1);
222+
byteValue = byte ? byte[0] : 0;
223+
result |= (byteValue & 0x7F) << bits;
224+
bits += 7;
225+
}
226+
227+
if (negative) {
228+
result = ~result;
229+
}
230+
return result;
231+
}
232+
233+
private readPackedIntFromString(bytes: Buffer): [number, number] {
234+
let bits = 6;
235+
236+
let byteValue = bytes ? bytes[6] : 0;
237+
let negative: boolean = (byteValue & 0x40) !== 0
238+
let result = (byteValue & 0x3F)
239+
240+
while ((byteValue & 0x80) !== 0) {
241+
byteValue = bytes ? bytes[7] : 0;
242+
result |= (byteValue & 0x7F) << bits;
243+
bits += 7;
244+
}
245+
246+
if (negative) {
247+
result = ~result;
248+
}
249+
return [result, bits];
250+
}
251+
252+
// Read a string from the response
253+
private readString(data: Buffer): string {
254+
console.log("### DEBUG DATA -> " + data)
255+
let [len, pos] = this.readPackedIntFromString(data)
256+
return data.slice(7 + (pos / 7), 7 + (pos / 7) + len).toString();
257+
}
258+
259+
// Close the socket connection
260+
async close(): Promise<void> {
261+
if (this.writer) {
262+
this.writer.end();
263+
}
264+
}
265+
266+
// Discover cluster information
267+
async discoverClusterInfo(): Promise<DiscoveredCluster> {
268+
const cluster = new DiscoveredCluster();
269+
cluster.ns_port = this.port;
270+
cluster.host = this.host;
271+
272+
//cluster.cluster_name = await this.lookup(CLUSTER_NAME_LOOKUP);
273+
//cluster.management_urls = parseResults(await this.lookup(NS_PREFIX + MANAGEMENT_LOOKUP));
274+
//cluster.jmx_urls = parseResults(await this.lookup(NS_PREFIX + JMX_LOOKUP));
275+
//cluster.metrics_urls = parseResults(await this.lookup(NS_PREFIX + METRICS_LOOKUP));
276+
cluster.grpc_proxy_endpoints = parseResults(await this.lookup(NS_PREFIX + GRPC_PROXY_LOOKUP));
277+
278+
return cluster;
279+
}
280+
281+
// Discover name service ports
282+
async discoverNameServicePorts(): Promise<ClusterNSPort[]> {
283+
const localCluster = await this.lookup(CLUSTER_NAME_LOOKUP);
284+
const otherClusters = await this.lookup(NS_PREFIX + CLUSTER_FOREIGN_LOOKUP);
285+
const otherClustersList = parseResults(otherClusters);
286+
287+
const clusterNames = [localCluster, ...otherClustersList];
288+
const listClusters = clusterNames.map((name) => {
289+
return new ClusterNSPort(name, name === localCluster ? this.port : 0, this.host, name === localCluster);
290+
});
291+
292+
// Determine port for other clusters
293+
for (const clusterNSPort of listClusters.slice(1)) {
294+
clusterNSPort.port = parseInt(await this.lookup(`${NS_PREFIX}${CLUSTER_FOREIGN_LOOKUP}/${clusterNSPort.cluster_name}${NS_LOCAL_PORT}`));
295+
}
296+
297+
return listClusters;
298+
}
299+
300+
// Resolve NS lookup address
301+
static async resolveNSLookupAddress(name: string): Promise<string> {
302+
let nslookup = null;
303+
let clusterInfo = null;
304+
305+
try {
306+
nslookup = await AsyncNSLookup.open(name);
307+
clusterInfo = await nslookup.discoverClusterInfo();
308+
} catch (error) {
309+
console.error('Error:', error);
310+
} finally {
311+
if (nslookup !== null) {
312+
await nslookup.close();
313+
}
314+
}
315+
316+
if (clusterInfo) {
317+
const pairs = clusterInfo.grpc_proxy_endpoints
318+
.map((e, i, arr) => (i % 2 === 0 ? `${e}:${arr[i + 1]}` : null))
319+
.filter(Boolean);
320+
321+
return randomBytes(pairs.length).toString();
322+
} else {
323+
return 'localhost:1408';
324+
}
325+
}
326+
}
327+
328+
// Main function to test the connection
329+
async function main() {
330+
try {
331+
const nslookup = await AsyncNSLookup.open('localhost:7574');
332+
console.log(`Connected to ${nslookup.host}:${nslookup.port}`);
333+
334+
// Example: Perform a lookup
335+
const clusterInfo = await nslookup.discoverClusterInfo();
336+
console.log(`Cluster Name: ${clusterInfo.cluster_name}`);
337+
console.log(`Management URLs: ${clusterInfo.management_urls}`);
338+
console.log(`JMX URLs: ${clusterInfo.jmx_urls}`);
339+
console.log(`Metrics URLs: ${clusterInfo.metrics_urls}`);
340+
console.log(`GRPC Endpoints: ${clusterInfo.grpc_proxy_endpoints}`);
341+
342+
const nsPorts = await nslookup.discoverNameServicePorts();
343+
nsPorts.forEach((portInfo) => {
344+
console.log(`Cluster Name: ${portInfo.cluster_name}, Port: ${portInfo.port}, Host: ${portInfo.host_name}, Is Local: ${portInfo.is_local}`);
345+
});
346+
347+
} catch (error) {
348+
console.error('Error:', error);
349+
}
350+
process.exit(0)
351+
}
352+
353+
// Run the main function
354+
main();

src/session.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import {event} from './events'
1313
import {NamedCache, NamedCacheClient, NamedMap} from './named-cache-client'
1414
import {util} from './util'
1515
import {ConnectivityState} from "@grpc/grpc-js/build/src/connectivity-state";
16+
import {registerResolver} from "@grpc/grpc-js/build/src/resolver";
17+
import CoherenceResolver = util.CoherenceResolver;
1618

1719
/**
1820
* Supported {@link Session} options.
@@ -100,9 +102,9 @@ export class Options {
100102
return;
101103
}
102104
// ensure address is sane
103-
if (!Options.ADDRESS_REGEXP.test(address)) {
104-
throw new Error('Expected address format is \'<hostname>:<port>\'. Configured: ' + address)
105-
}
105+
// if (!Options.ADDRESS_REGEXP.test(address)) {
106+
// throw new Error('Expected address format is \'<hostname>:<port>\'. Configured: ' + address)
107+
// }
106108

107109
this._address = address
108110
}
@@ -507,6 +509,9 @@ export class Session
507509
*/
508510
constructor (sessionOptions?: Options | object) {
509511
super()
512+
513+
registerResolver("coherence", CoherenceResolver)
514+
510515
if (sessionOptions) {
511516
this._sessionOptions = Object.assign(new Options(), sessionOptions)
512517
// @ts-ignore -- added for 'tls' index access

0 commit comments

Comments
 (0)