Skip to content

Commit db3a670

Browse files
davemooreuwstjholm
andauthored
feat: batch job support (#231)
Co-authored-by: Tim Holm <[email protected]>
1 parent 6bc461c commit db3a670

File tree

22 files changed

+3925
-13
lines changed

22 files changed

+3925
-13
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@nitric/sdk",
33
"description": "Nitric NodeJS client sdk",
4-
"nitric": "v1.6.0",
4+
"nitric": "v1.14.0",
55
"author": "Nitric <https://github.com/nitrictech>",
66
"repository": "https://github.com/nitrictech/node-sdk",
77
"main": "lib/index.js",

src/api/batch/v1/batch.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright 2021, Nitric Technologies Pty Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
import { SERVICE_BIND } from '@nitric/sdk/constants';
15+
import { BatchClient } from '@nitric/sdk/gen/nitric/proto/batch/v1/batch_grpc_pb';
16+
import * as grpc from '@grpc/grpc-js';
17+
18+
let batchClient: BatchClient;
19+
20+
export const getBatchClient = (): BatchClient => {
21+
if (!batchClient) {
22+
batchClient = new BatchClient(
23+
SERVICE_BIND,
24+
grpc.ChannelCredentials.createInsecure()
25+
);
26+
}
27+
return batchClient;
28+
};

src/api/batch/v1/index.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// Copyright 2021, Nitric Technologies Pty Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
export * from './job';

src/api/batch/v1/job.test.ts

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright 2021, Nitric Technologies Pty Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
import { Job } from './job';
15+
import { UnimplementedError } from '../../errors';
16+
import { status } from '@grpc/grpc-js';
17+
import { getBatchClient } from './batch';
18+
import { JobSubmitResponse } from '@nitric/sdk/gen/nitric/proto/batch/v1/batch_pb';
19+
import { BatchClient } from '@nitric/sdk/gen/nitric/proto/batch/v1/batch_grpc_pb';
20+
21+
describe('Job Client Tests', () => {
22+
describe('Given the grpc client returns an unimplemented error status', () => {
23+
const MOCK_ERROR = {
24+
code: status.UNIMPLEMENTED,
25+
message: 'UNIMPLEMENTED',
26+
};
27+
let submitMock;
28+
beforeAll(() => {
29+
submitMock = jest
30+
.spyOn(BatchClient.prototype, 'submitJob')
31+
.mockImplementation((request, callback: any) => {
32+
callback(MOCK_ERROR, null);
33+
return null as any;
34+
});
35+
});
36+
afterAll(() => {
37+
jest.resetAllMocks();
38+
});
39+
test('Then submit call should return an UnimplementedError', async () => {
40+
const job = new Job('test', getBatchClient());
41+
await expect(
42+
job.submit({
43+
test: 'test',
44+
})
45+
).rejects.toBeInstanceOf(UnimplementedError);
46+
});
47+
test('The Grpc client for Job.submit should have been called exactly once', () => {
48+
expect(submitMock).toHaveBeenCalledTimes(1);
49+
});
50+
});
51+
describe('Given the grpc returns successfully', () => {
52+
let submitMock;
53+
beforeAll(() => {
54+
submitMock = jest
55+
.spyOn(BatchClient.prototype, 'submitJob')
56+
.mockImplementation((request, callback: any) => {
57+
const response = new JobSubmitResponse();
58+
callback(null, response);
59+
return null as any;
60+
});
61+
});
62+
afterAll(() => {
63+
jest.resetAllMocks();
64+
});
65+
test('Then Eventing.submit should resolve with the provided id', async () => {
66+
const client = new Job('test', getBatchClient());
67+
await expect(
68+
client.submit({ message: 'Test Payload' })
69+
).resolves.toBeUndefined();
70+
});
71+
test('The Grpc client for Eventing.submit should have been called exactly once', () => {
72+
expect(submitMock).toHaveBeenCalledTimes(1);
73+
});
74+
});
75+
});

src/api/batch/v1/job.ts

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2021, Nitric Technologies Pty Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
import { BatchClient } from '@nitric/proto/batch/v1/batch_grpc_pb';
15+
import { JobData, JobSubmitRequest } from '@nitric/proto/batch/v1/batch_pb';
16+
import { Struct } from 'google-protobuf/google/protobuf/struct_pb';
17+
import { fromGrpcError } from '../../errors';
18+
19+
export class Job<T extends Record<string, any> = Record<string, any>> {
20+
private name: string;
21+
private client: BatchClient;
22+
23+
constructor(name: string, client: BatchClient) {
24+
this.name = name;
25+
this.client = client;
26+
}
27+
28+
/**
29+
* Submit a job to the batch service
30+
*
31+
* @example
32+
* ```typescript
33+
* const analyse = job('analyse').allow('submit');
34+
*
35+
* await analyse.submit({
36+
* data: 'some data',
37+
* });
38+
* ```
39+
*
40+
* @param data - Data to submit to the job
41+
* @returns Promise that resolves when the job has been submitted
42+
*/
43+
async submit(data: T): Promise<void> {
44+
const request = new JobSubmitRequest();
45+
const jobData = new JobData();
46+
47+
jobData.setStruct(Struct.fromJavaScript(data));
48+
request.setJobName(this.name);
49+
request.setData(jobData);
50+
51+
return new Promise<void>((resolve, reject) => {
52+
this.client.submitJob(request, (error, _response) => {
53+
if (error) {
54+
reject(fromGrpcError(error));
55+
} else {
56+
resolve();
57+
}
58+
});
59+
});
60+
}
61+
}

src/context/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ export * from './http';
1717
export * from './interval';
1818
export * from './message';
1919
export * from './websocket';
20+
export * from './job';

src/context/job.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright 2021, Nitric Technologies Pty Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
import {
15+
JobRequest as JobRequestPb,
16+
JobResponse as JobResponsePb,
17+
} from '../gen/nitric/proto/batch/v1/batch_pb';
18+
import { AbstractRequest, BaseContext } from './base';
19+
20+
export interface JobResponse {
21+
success: boolean;
22+
}
23+
24+
export class JobRequest extends AbstractRequest {
25+
public readonly jobName: string;
26+
27+
constructor(data: string | Uint8Array, jobName: string) {
28+
super(data);
29+
this.jobName = jobName;
30+
}
31+
}
32+
33+
export class JobContext extends BaseContext<JobRequest, JobResponse> {
34+
public get job(): JobContext {
35+
return this;
36+
}
37+
38+
static fromJobRequest(jobRequest: JobRequestPb): JobContext {
39+
const ctx = new JobContext();
40+
const jobName = jobRequest.getJobName();
41+
42+
const data = jobRequest.getData().getStruct().toJavaScript();
43+
44+
ctx.request = new JobRequest(JSON.stringify(data), jobName);
45+
46+
ctx.response = {
47+
success: true,
48+
};
49+
50+
return ctx;
51+
}
52+
53+
static toJobResponse(ctx: JobContext): JobResponsePb {
54+
const evtCtx = ctx.job;
55+
const jobResponse = new JobResponsePb();
56+
jobResponse.setSuccess(evtCtx.res.success);
57+
58+
return jobResponse;
59+
}
60+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// GENERATED CODE -- DO NOT EDIT!
2+
3+
// package: nitric.proto.batch.v1
4+
// file: nitric/proto/batch/v1/batch.proto
5+
6+
import * as nitric_proto_batch_v1_batch_pb from "../../../../nitric/proto/batch/v1/batch_pb";
7+
import * as grpc from "@grpc/grpc-js";
8+
9+
interface IJobService extends grpc.ServiceDefinition<grpc.UntypedServiceImplementation> {
10+
handleJob: grpc.MethodDefinition<nitric_proto_batch_v1_batch_pb.ClientMessage, nitric_proto_batch_v1_batch_pb.ServerMessage>;
11+
}
12+
13+
export const JobService: IJobService;
14+
15+
export interface IJobServer extends grpc.UntypedServiceImplementation {
16+
handleJob: grpc.handleBidiStreamingCall<nitric_proto_batch_v1_batch_pb.ClientMessage, nitric_proto_batch_v1_batch_pb.ServerMessage>;
17+
}
18+
19+
export class JobClient extends grpc.Client {
20+
constructor(address: string, credentials: grpc.ChannelCredentials, options?: object);
21+
handleJob(metadataOrOptions?: grpc.Metadata | grpc.CallOptions | null): grpc.ClientDuplexStream<nitric_proto_batch_v1_batch_pb.ClientMessage, nitric_proto_batch_v1_batch_pb.ServerMessage>;
22+
handleJob(metadata?: grpc.Metadata | null, options?: grpc.CallOptions | null): grpc.ClientDuplexStream<nitric_proto_batch_v1_batch_pb.ClientMessage, nitric_proto_batch_v1_batch_pb.ServerMessage>;
23+
}
24+
25+
interface IBatchService extends grpc.ServiceDefinition<grpc.UntypedServiceImplementation> {
26+
submitJob: grpc.MethodDefinition<nitric_proto_batch_v1_batch_pb.JobSubmitRequest, nitric_proto_batch_v1_batch_pb.JobSubmitResponse>;
27+
}
28+
29+
export const BatchService: IBatchService;
30+
31+
export interface IBatchServer extends grpc.UntypedServiceImplementation {
32+
submitJob: grpc.handleUnaryCall<nitric_proto_batch_v1_batch_pb.JobSubmitRequest, nitric_proto_batch_v1_batch_pb.JobSubmitResponse>;
33+
}
34+
35+
export class BatchClient extends grpc.Client {
36+
constructor(address: string, credentials: grpc.ChannelCredentials, options?: object);
37+
submitJob(argument: nitric_proto_batch_v1_batch_pb.JobSubmitRequest, callback: grpc.requestCallback<nitric_proto_batch_v1_batch_pb.JobSubmitResponse>): grpc.ClientUnaryCall;
38+
submitJob(argument: nitric_proto_batch_v1_batch_pb.JobSubmitRequest, metadataOrOptions: grpc.Metadata | grpc.CallOptions | null, callback: grpc.requestCallback<nitric_proto_batch_v1_batch_pb.JobSubmitResponse>): grpc.ClientUnaryCall;
39+
submitJob(argument: nitric_proto_batch_v1_batch_pb.JobSubmitRequest, metadata: grpc.Metadata | null, options: grpc.CallOptions | null, callback: grpc.requestCallback<nitric_proto_batch_v1_batch_pb.JobSubmitResponse>): grpc.ClientUnaryCall;
40+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// GENERATED CODE -- DO NOT EDIT!
2+
3+
'use strict';
4+
var grpc = require('@grpc/grpc-js');
5+
var nitric_proto_batch_v1_batch_pb = require('../../../../nitric/proto/batch/v1/batch_pb.js');
6+
var google_protobuf_struct_pb = require('google-protobuf/google/protobuf/struct_pb.js');
7+
8+
function serialize_nitric_proto_batch_v1_ClientMessage(arg) {
9+
if (!(arg instanceof nitric_proto_batch_v1_batch_pb.ClientMessage)) {
10+
throw new Error('Expected argument of type nitric.proto.batch.v1.ClientMessage');
11+
}
12+
return Buffer.from(arg.serializeBinary());
13+
}
14+
15+
function deserialize_nitric_proto_batch_v1_ClientMessage(buffer_arg) {
16+
return nitric_proto_batch_v1_batch_pb.ClientMessage.deserializeBinary(new Uint8Array(buffer_arg));
17+
}
18+
19+
function serialize_nitric_proto_batch_v1_JobSubmitRequest(arg) {
20+
if (!(arg instanceof nitric_proto_batch_v1_batch_pb.JobSubmitRequest)) {
21+
throw new Error('Expected argument of type nitric.proto.batch.v1.JobSubmitRequest');
22+
}
23+
return Buffer.from(arg.serializeBinary());
24+
}
25+
26+
function deserialize_nitric_proto_batch_v1_JobSubmitRequest(buffer_arg) {
27+
return nitric_proto_batch_v1_batch_pb.JobSubmitRequest.deserializeBinary(new Uint8Array(buffer_arg));
28+
}
29+
30+
function serialize_nitric_proto_batch_v1_JobSubmitResponse(arg) {
31+
if (!(arg instanceof nitric_proto_batch_v1_batch_pb.JobSubmitResponse)) {
32+
throw new Error('Expected argument of type nitric.proto.batch.v1.JobSubmitResponse');
33+
}
34+
return Buffer.from(arg.serializeBinary());
35+
}
36+
37+
function deserialize_nitric_proto_batch_v1_JobSubmitResponse(buffer_arg) {
38+
return nitric_proto_batch_v1_batch_pb.JobSubmitResponse.deserializeBinary(new Uint8Array(buffer_arg));
39+
}
40+
41+
function serialize_nitric_proto_batch_v1_ServerMessage(arg) {
42+
if (!(arg instanceof nitric_proto_batch_v1_batch_pb.ServerMessage)) {
43+
throw new Error('Expected argument of type nitric.proto.batch.v1.ServerMessage');
44+
}
45+
return Buffer.from(arg.serializeBinary());
46+
}
47+
48+
function deserialize_nitric_proto_batch_v1_ServerMessage(buffer_arg) {
49+
return nitric_proto_batch_v1_batch_pb.ServerMessage.deserializeBinary(new Uint8Array(buffer_arg));
50+
}
51+
52+
53+
// Service for processing jobs
54+
var JobService = exports.JobService = {
55+
handleJob: {
56+
path: '/nitric.proto.batch.v1.Job/HandleJob',
57+
requestStream: true,
58+
responseStream: true,
59+
requestType: nitric_proto_batch_v1_batch_pb.ClientMessage,
60+
responseType: nitric_proto_batch_v1_batch_pb.ServerMessage,
61+
requestSerialize: serialize_nitric_proto_batch_v1_ClientMessage,
62+
requestDeserialize: deserialize_nitric_proto_batch_v1_ClientMessage,
63+
responseSerialize: serialize_nitric_proto_batch_v1_ServerMessage,
64+
responseDeserialize: deserialize_nitric_proto_batch_v1_ServerMessage,
65+
},
66+
};
67+
68+
exports.JobClient = grpc.makeGenericClientConstructor(JobService);
69+
// Service for submitting jobs to be processed
70+
var BatchService = exports.BatchService = {
71+
submitJob: {
72+
path: '/nitric.proto.batch.v1.Batch/SubmitJob',
73+
requestStream: false,
74+
responseStream: false,
75+
requestType: nitric_proto_batch_v1_batch_pb.JobSubmitRequest,
76+
responseType: nitric_proto_batch_v1_batch_pb.JobSubmitResponse,
77+
requestSerialize: serialize_nitric_proto_batch_v1_JobSubmitRequest,
78+
requestDeserialize: deserialize_nitric_proto_batch_v1_JobSubmitRequest,
79+
responseSerialize: serialize_nitric_proto_batch_v1_JobSubmitResponse,
80+
responseDeserialize: deserialize_nitric_proto_batch_v1_JobSubmitResponse,
81+
},
82+
};
83+
84+
exports.BatchClient = grpc.makeGenericClientConstructor(BatchService);

0 commit comments

Comments
 (0)