new-row.ts•5.47 kB
import {
  createTrigger,
  TriggerStrategy,
  PiecePropValueSchema,
  Property,
} from '@activepieces/pieces-framework';
import {
  DedupeStrategy,
  Polling,
  pollingHelper,
} from '@activepieces/pieces-common';
import dayjs from 'dayjs';
import { surrealdbAuth } from '../..';
import client from '../common';
import crypto from 'crypto';
// replace auth with piece auth variable
const polling: Polling<
  PiecePropValueSchema<typeof surrealdbAuth>,
  {
    table: string;
    order_by: string;
    order_direction: 'ASC' | 'DESC' | undefined;
  }
> = {
  strategy: DedupeStrategy.LAST_ITEM,
  items: async ({ auth, propsValue, lastItemId }) => {
    const lastItem = lastItemId as string;
    const query = constructQuery({
      order_by: propsValue.order_by,
      lastItem: lastItem,
      order_direction: propsValue.order_direction,
    });
    const authProps = auth as PiecePropValueSchema<typeof surrealdbAuth>;
    const result = await client.query(authProps, query, {
      table: propsValue.table,
    });
    const items = result.body[0].result.map(function (
      row: Record<string, any>
    ) {
      const rowHash = crypto
        .createHash('md5')
        .update(JSON.stringify(row))
        .digest('hex');
      const isTimestamp = dayjs(row[propsValue.order_by]).isValid();
      const orderValue = isTimestamp
        ? dayjs(row[propsValue.order_by]).toISOString()
        : row[propsValue.order_by];
      return {
        id: orderValue + '|' + rowHash,
        data: row,
      };
    });
    return items;
  },
};
function constructQuery({
  order_by,
  lastItem,
  order_direction,
}: {
  order_by: string;
  order_direction: 'ASC' | 'DESC' | undefined;
  lastItem: string;
}): string {
  const lastOrderKey = lastItem ? lastItem.split('|')[0] : null;
  if (lastOrderKey === null) {
    switch (order_direction) {
      case 'ASC':
        return `SELECT * FROM type::table($table) ORDER BY ${order_by} ASC LIMIT 5`;
      case 'DESC':
        return `SELECT * FROM type::table($table) ORDER BY ${order_by} DESC LIMIT 5`;
      default:
        throw new Error(
          JSON.stringify({
            message: 'Invalid order direction',
            order_direction: order_direction,
          })
        );
    }
  } else {
    switch (order_direction) {
      case 'ASC':
        return `SELECT * FROM type::table($table) WHERE ${order_by} <= '${lastOrderKey}' ORDER BY ${order_by} ASC`;
      case 'DESC':
        return `SELECT * FROM type::table($table) WHERE ${order_by} >= '${lastOrderKey}' ORDER BY ${order_by} DESC`;
      default:
        throw new Error(
          JSON.stringify({
            message: 'Invalid order direction',
            order_direction: order_direction,
          })
        );
    }
  }
}
export const newRow = createTrigger({
  name: 'new-row',
  displayName: 'New Row',
  description: 'Triggers when a new row is added to the defined table.',
  props: {
    description: Property.MarkDown({
      value: `**NOTE:** The trigger fetches the latest rows using the provided order by column (newest first), and then will keep polling until the previous last row is reached. It's suggested to add a created_at timestamp. \`DEFINE FIELD OVERWRITE createdAt ON schedule VALUE time::now() READONLY;\``,
    }),
    table: Property.Dropdown({
      displayName: 'Table name',
      required: true,
      refreshers: ['auth'],
      refreshOnSearch: false,
      options: async ({ auth }) => {
        if (!auth) {
          return {
            disabled: true,
            options: [],
            placeholder: 'Please authenticate first',
          };
        }
        const authProps = auth as PiecePropValueSchema<typeof surrealdbAuth>;
        try {
          const result = await client.query(authProps, 'INFO FOR DB');
          const options = Object.keys(result.body[0].result.tables).map(
            (row) => ({
              label: row,
              value: row,
            })
          );
          return {
            disabled: false,
            options,
          };
        } catch (e) {
          return {
            disabled: true,
            options: [],
            placeholder: JSON.stringify(e),
          };
        }
      },
    }),
    order_by: Property.ShortText({
      displayName: 'Column to order by',
      description: 'Use something like a created timestamp.',
      required: true,
      defaultValue: 'created_at',
    }),
    order_direction: Property.StaticDropdown<'ASC' | 'DESC'>({
      displayName: 'Order Direction',
      description:
        'The direction to sort by such that the newest rows are fetched first.',
      required: true,
      options: {
        options: [
          {
            label: 'Ascending',
            value: 'ASC',
          },
          {
            label: 'Descending',
            value: 'DESC',
          },
        ],
      },
      defaultValue: 'DESC',
    }),
  },
  sampleData: {},
  type: TriggerStrategy.POLLING,
  auth: surrealdbAuth,
  async test(context) {
    return await pollingHelper.test(polling, context);
  },
  async onEnable(context) {
    const { store, auth, propsValue } = context;
    await pollingHelper.onEnable(polling, { store, propsValue, auth });
  },
  async onDisable(context) {
    const { store, auth, propsValue } = context;
    await pollingHelper.onDisable(polling, { store, propsValue, auth });
  },
  async run(context) {
    return await pollingHelper.poll(polling, context);
  },
});